Source code for rx.linq.observable.fromcallback

from rx.core import Observable, AnonymousObservable
from rx.internal import extensionclassmethod


@extensionclassmethod(Observable)
def from_callback(cls, func, selector=None):
    """Converts a callback function to an observable sequence.

    Keyword arguments:
    func -- {Function} Function with a callback as the last parameter to
        convert to an Observable sequence.
    selector -- {Function} [Optional] A selector which takes the arguments
        from the callback to produce a single item to yield on next.

    Returns {Function} A function, when executed with the required
    parameters minus the callback, produces an Observable sequence with a
    single value of the arguments to the callback as a list.
    """

    def function(*args):
        arguments = list(args)

        def subscribe(observer):
            def handler(*args):
                results = list(args)
                if selector:
                    try:
                        results = selector(args)
                    except Exception as err:
                        observer.on_error(err)
                        return

                    observer.on_next(results)
                else:
                    if isinstance(results, list) and len(results) <= 1:
                        observer.on_next(*results)
                    else:
                        observer.on_next(results)

                    observer.on_completed()

            arguments.append(handler)
            func(*arguments)

        return AnonymousObservable(subscribe)
    return function