Source code for rx.linq.observable.fromfuture

from rx.core import Observable, AnonymousObservable
from rx.internal.utils import is_future
from rx.internal import extensionclassmethod


@extensionclassmethod(Observable)
def from_future(cls, future):
    """Converts a Future to an Observable sequence

    Keyword Arguments:
    future -- {Future} A Python 3 compatible future.
        https://docs.python.org/3/library/asyncio-task.html#future
        http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.Future

    Returns {Observable} An Observable sequence which wraps the existing
    future success and failure.
    """

    def subscribe(observer):
        def done(future):
            try:
                value = future.result()
            except Exception as ex:
                observer.on_error(ex)
            else:
                observer.on_next(value)
                observer.on_completed()

        future.add_done_callback(done)

        def dispose():
            if future and future.cancel:
              future.cancel()
        return dispose

    return AnonymousObservable(subscribe) if is_future(future) else future