Source code for rx.linq.observable.todict

from rx import Observable, AnonymousObservable
from rx.internal import extensionmethod

def _to_dict(source, map_type, key_selector, element_selector):
    def subscribe(observer):
        m = map_type()

        def on_next(x):
            try:
                key = key_selector(x)
            except Exception as ex:
                observer.on_error(ex)
                return

            element = x
            if element_selector:
                try:
                    element = element_selector(x)
                except Exception as ex:
                    observer.on_error(ex)
                    return

            m[key] = element

        def on_completed():
            observer.on_next(m)
            observer.on_completed()

        return source.subscribe(on_next, observer.on_error, on_completed)
    return AnonymousObservable(subscribe)


@extensionmethod(Observable)
def to_dict(self, key_selector, element_selector=None):
    """Converts the observable sequence to a :class:`dict` if it exists.

    Arguments:
      key_selector (types.FunctionType): A function which produces the key
        for the :class:`dict`.

    Keyword Arguments:
      element_selector (types.FunctionType): An optional function which
        produces the element for the :class:`dict`. If not present, defaults to
        the value from the observable sequence.

    Returns:
      (Observable): An observable sequence with a single value of a :class:`dict`
        containing the values from the observable sequence.
    """
    return _to_dict(self, dict, key_selector, element_selector)