Source code for rx.linq.observable.materialize

from rx.core import Observable, AnonymousObservable
from rx.core.notification import OnNext, OnError, OnCompleted
from rx.internal import extensionmethod


@extensionmethod(Observable)
def materialize(self):
    """Materializes the implicit notifications of an observable sequence as
    explicit notification values.

    Returns an observable sequence containing the materialized notification
    values from the source sequence.
    """

    source = self

    def subscribe(observer):
        def on_next(value):
            observer.on_next(OnNext(value))

        def on_error(exception):
            observer.on_next(OnError(exception))
            observer.on_completed()

        def on_completed():
            observer.on_next(OnCompleted())
            observer.on_completed()

        return source.subscribe(on_next, on_error, on_completed)
    return AnonymousObservable(subscribe)