Source code for rx.linq.observable.tofuture

import rx
from rx.core import Observable
from rx.internal import extensionmethod


@extensionmethod(Observable)
def to_future(self, future_ctor=None):
    """Converts an existing observable sequence to a Future

    Example:
    future = rx.Observable.return_value(42).to_future(trollius.Future);

    With config:
    rx.config["Future"] = trollius.Future
    future = rx.Observable.return_value(42).to_future()

    future_ctor -- {Function} [Optional] The constructor of the future.
        If not provided, it looks for it in rx.config.Future.

    Returns {Future} An future with the last value from the observable
    sequence.
    """

    future_ctor = future_ctor or rx.config.get("Future")
    if not future_ctor:
        raise Exception('Future type not provided nor in rx.config.Future')

    source = self

    future = future_ctor()

    value = [None]
    has_value = [False]

    def on_next(v):
        value[0] = v
        has_value[0] = True

    def on_error(err):
        future.set_exception(err)

    def on_completed():
        if has_value[0]:
            future.set_result(value[0])

    source.subscribe(on_next, on_error, on_completed)

    # No cancellation can be done
    return future


@extensionmethod(Observable)
def __await__(self):
    """Awaits the given observable
    :returns: The last item of the observable sequence.
    :rtype: Any
    :raises TypeError: If key is not of type int or slice
    """
    return iter(self.to_future())