Source code for rx.linq.observable.using

from rx.core import Observable, AnonymousObservable, Disposable
from rx.disposables import CompositeDisposable
from rx.internal import extensionclassmethod


@extensionclassmethod(Observable)
def using(cls, resource_factory, observable_factory):
    """Constructs an observable sequence that depends on a resource object,
    whose lifetime is tied to the resulting observable sequence's lifetime.

    Example::

        res = rx.Observable.using(lambda: AsyncSubject(), lambda: s: s)

    Arguments:
      resource_factory (types.FunctionType): Factory function to obtain a
        resource object.
      observable_factory (types.FunctionType): Factory function to obtain an
        observable sequence that depends on the obtained resource.

    Returns:
      An observable sequence whose lifetime controls the lifetime of
      the dependent resource object.
    """

    def subscribe(observer):
        disposable = Disposable.empty()
        try:
            resource = resource_factory()
            if resource:
                disposable = resource

            source = observable_factory(resource)
        except Exception as exception:
            d = Observable.throw_exception(exception).subscribe(observer)
            return CompositeDisposable(d, disposable)

        return CompositeDisposable(source.subscribe(observer), disposable)
    return AnonymousObservable(subscribe)