Source code for rx.linq.observable.finallyaction

from rx.core import Observable, AnonymousObservable, Disposable
from rx.internal import extensionmethod


@extensionmethod(Observable)
def finally_action(self, action):
    """Invokes a specified action after the source observable sequence
    terminates gracefully or exceptionally.

    Example:
    res = observable.finally(lambda: print('sequence ended')

    Keyword arguments:
    action -- {Function} Action to invoke after the source observable
        sequence terminates.
    Returns {Observable} Source sequence with the action-invoking
    termination behavior applied.
    """

    source = self

    def subscribe(observer):
        try:
            subscription = source.subscribe(observer)
        except Exception:
            action()
            raise

        def dispose():
            try:
                subscription.dispose()
            finally:
                action()

        return Disposable.create(dispose)
    return AnonymousObservable(subscribe)