Source code for rx.linq.observable.merge

from rx.core import Scheduler, Observable, AnonymousObservable
from rx.disposables import CompositeDisposable, SingleAssignmentDisposable
from rx.concurrency import immediate_scheduler
from rx.internal import extensionmethod, extensionclassmethod


@extensionmethod(Observable, instancemethod=True)
def merge(self, *args, **kwargs):
    """Merges an observable sequence of observable sequences into an
    observable sequence, limiting the number of concurrent subscriptions
    to inner sequences. Or merges two observable sequences into a single
    observable sequence.

    1 - merged = sources.merge(1)
    2 - merged = source.merge(other_source)

    max_concurrent_or_other [Optional] Maximum number of inner observable
        sequences being subscribed to concurrently or the second
        observable sequence.

    Returns the observable sequence that merges the elements of the inner
    sequences.
    """

    if not isinstance(args[0], int):
        args = args + tuple([self])
        return Observable.merge(*args, **kwargs)

    max_concurrent = args[0]
    sources = self

    def subscribe(observer):
        active_count = [0]
        group = CompositeDisposable()
        is_stopped = [False]
        q = []

        def subscribe(xs):
            subscription = SingleAssignmentDisposable()
            group.add(subscription)

            def on_completed():
                group.remove(subscription)
                if len(q):
                    s = q.pop(0)
                    subscribe(s)
                else:
                    active_count[0] -= 1
                    if is_stopped[0] and active_count[0] == 0:
                        observer.on_completed()

            subscription.disposable = xs.subscribe(observer.on_next,
                                                   observer.on_error,
                                                   on_completed)

        def on_next(inner_source):
            if active_count[0] < max_concurrent:
                active_count[0] += 1
                subscribe(inner_source)
            else:
                q.append(inner_source)

        def on_completed():
            is_stopped[0] = True
            if active_count[0] == 0:
                observer.on_completed()

        group.add(sources.subscribe(on_next, observer.on_error, on_completed))
        return group
    return AnonymousObservable(subscribe)


@extensionclassmethod(Observable)  # noqa
def merge(cls, *args):
    """Merges all the observable sequences into a single observable
    sequence. The scheduler is optional and if not specified, the
    immediate scheduler is used.

    1 - merged = rx.Observable.merge(xs, ys, zs)
    2 - merged = rx.Observable.merge([xs, ys, zs])
    3 - merged = rx.Observable.merge(scheduler, xs, ys, zs)
    4 - merged = rx.Observable.merge(scheduler, [xs, ys, zs])

    Returns the observable sequence that merges the elements of the
    observable sequences.
    """

    if not args[0]:
        scheduler = immediate_scheduler
        sources = args[1:]
    elif isinstance(args[0], Scheduler):
        scheduler = args[0]
        sources = args[1:]
    else:
        scheduler = immediate_scheduler
        sources = args[:]

    if isinstance(sources[0], list):
        sources = sources[0]

    return Observable.from_(sources, scheduler).merge_all()


@extensionmethod(Observable, alias="merge_observable")
def merge_all(self):
    """Merges an observable sequence of observable sequences into an
    observable sequence.

    Returns the observable sequence that merges the elements of the inner
    sequences.
    """

    sources = self

    def subscribe(observer):
        group = CompositeDisposable()
        is_stopped = [False]
        m = SingleAssignmentDisposable()
        group.add(m)

        def on_next(inner_source):
            inner_subscription = SingleAssignmentDisposable()
            group.add(inner_subscription)

            inner_source = Observable.from_future(inner_source)

            def on_next(x):
                observer.on_next(x)

            def on_completed():
                group.remove(inner_subscription)
                if is_stopped[0] and len(group) == 1:
                    observer.on_completed()

            disposable = inner_source.subscribe(on_next, observer.on_error,
                                                on_completed)
            inner_subscription.disposable = disposable

        def on_completed():
            is_stopped[0] = True
            if len(group) == 1:
                observer.on_completed()

        m.disposable = sources.subscribe(on_next, observer.on_error,
                                         on_completed)
        return group

    return AnonymousObservable(subscribe)