Source code for rx.linq.observable.when

from rx import Observable, AnonymousObserver, AnonymousObservable
from rx.disposables import CompositeDisposable
from rx.internal import extensionclassmethod


@extensionclassmethod(Observable)
def when(cls, *args):
    """Joins together the results from several patterns.

    :param Observable cls: Observable class.
    :param list[Plan] args: A series of plans (specified as a list of as a
        series of arguments) created by use of the Then operator on patterns.
    :returns: Observable sequence with the results form matching several
        patterns.
    :rtype: Observable
    """

    plans = args[0] if len(args) and isinstance(args[0], list) else list(args)

    def subscribe(observer):
        active_plans = []
        external_subscriptions = {}

        def on_error(err):
            for v in external_subscriptions.values():
                v.on_error(err)
            observer.on_error(err)

        out_observer = AnonymousObserver(observer.on_next, on_error, observer.on_completed)

        def deactivate(active_plan):
            active_plans.remove(active_plan)
            if not len(active_plans):
                observer.on_completed()
        try:
            for plan in plans:
                active_plans.append(plan.activate(external_subscriptions, 
                                                  out_observer, deactivate))
        except Exception as ex:
            Observable.throw(ex).subscribe(observer)

        group = CompositeDisposable()
        for join_observer in external_subscriptions.values():
            join_observer.subscribe()
            group.add(join_observer)

        return group

    return AnonymousObservable(subscribe)