Source code for rx.linq.observable.concat

from rx.core import Observable, AnonymousObservable, Disposable
from rx.disposables import SingleAssignmentDisposable, CompositeDisposable, SerialDisposable
from rx.concurrency import immediate_scheduler
from rx.internal import extensionmethod, extensionclassmethod
from rx.internal import Enumerable


@extensionmethod(Observable, instancemethod=True)
def concat(self, *args):
    """Concatenates all the observable sequences. This takes in either an
    array or variable arguments to concatenate.

    1 - concatenated = xs.concat(ys, zs)
    2 - concatenated = xs.concat([ys, zs])

    Returns an observable sequence that contains the elements of each given
    sequence, in sequential order.
    """
    if isinstance(args[0], list):
        items = args[0]
    else:
        items = list(args)

    items.insert(0, self)
    return Observable.concat(items)


@extensionmethod(Observable)
def __add__(self, other):
    """Pythonic version of concat

    Example:
    zs = xs + ys
    Returns self.concat(other)"""

    return self.concat(other)


@extensionmethod(Observable)
def __iadd__(self, other):
    """Pythonic use of concat

    Example:
    xs += ys

    Returns self.concat(self, other)"""

    return self.concat(self, other)


@extensionclassmethod(Observable)
def concat(cls, *args):
    """Concatenates all the observable sequences.

    1 - res = Observable.concat(xs, ys, zs)
    2 - res = Observable.concat([xs, ys, zs])

    Returns an observable sequence that contains the elements of each given
    sequence, in sequential order.
    """
    if isinstance(args[0], list) or isinstance(args[0], Enumerable):
        sources = args[0]
    else:
        sources = list(args)

    def subscribe(observer):
        enum = iter(sources)
        is_disposed = [False]
        subscription = SerialDisposable()

        def action(action1, state=None):
            if is_disposed[0]:
                return
            try:
                current = next(enum)
            except StopIteration:
                observer.on_completed()
            except Exception as ex:
                observer.on_error(ex)
            else:
                d = SingleAssignmentDisposable()
                subscription.disposable = d
                d.disposable = current.subscribe(
                    observer.on_next,
                    observer.on_error,
                    lambda: action1()
                )

        cancelable = immediate_scheduler.schedule_recursive(action)

        def dispose():
            is_disposed[0] = True
        return CompositeDisposable(subscription, cancelable, Disposable.create(dispose))
    return AnonymousObservable(subscribe)


@extensionmethod(Observable)
def concat_all(self):
    """Concatenates an observable sequence of observable sequences.

    Returns an observable sequence that contains the elements of each
    observed inner sequence, in sequential order.
    """

    return self.merge(1)