Source code for rx.linq.observable.amb

from rx.core import Observable, AnonymousObservable
from rx.disposables import CompositeDisposable, SingleAssignmentDisposable
from rx.internal import extensionmethod, extensionclassmethod


@extensionmethod(Observable, instancemethod=True)
def amb(self, right_source):
    """Propagates the observable sequence that reacts first.

    Arguments:
      right_source (Observable): Second observable sequence.

    Returns:
      Observable: an observable sequence that surfaces either of the given
      sequences, whichever reacted first.
    """

    left_source = self
    right_source = Observable.from_future(right_source)

    def subscribe(observer):
        choice = [None]
        left_choice = 'L'
        right_choice = 'R',
        left_subscription = SingleAssignmentDisposable()
        right_subscription = SingleAssignmentDisposable()

        def choice_left():
            if not choice[0]:
                choice[0] = left_choice
                right_subscription.dispose()

        def choice_right():
            if not choice[0]:
                choice[0] = right_choice
                left_subscription.dispose()

        def on_next_left(value):
            with self.lock:
                choice_left()
            if choice[0] == left_choice:
                observer.on_next(value)

        def on_error_left(err):
            with self.lock:
                choice_left()
            if choice[0] == left_choice:
                observer.on_error(err)

        def on_completed_left():
            with self.lock:
                choice_left()
            if choice[0] == left_choice:
                observer.on_completed()

        ld = left_source.subscribe(on_next_left, on_error_left,
                                   on_completed_left)
        left_subscription.disposable = ld

        def on_next_right(value):
            with self.lock:
                choice_right()
            if choice[0] == right_choice:
                observer.on_next(value)

        def on_error_right(err):
            with self.lock:
                choice_right()
            if choice[0] == right_choice:
                observer.on_error(err)

        def on_completed_right():
            with self.lock:
                choice_right()
            if choice[0] == right_choice:
                observer.on_completed()

        rd = right_source.subscribe(on_next_right, on_error_right,
                                    on_completed_right)
        right_subscription.disposable = rd
        return CompositeDisposable(left_subscription, right_subscription)
    return AnonymousObservable(subscribe)


@extensionclassmethod(Observable)
def amb(cls, *args):
    """Propagates the observable sequence that reacts first.

    Example::
        
        winner = rx.Observable.amb(xs, ys, zs)

    Returns:
      Observable: an observable sequence that surfaces any of the given sequences,
      whichever reacted first.
    """

    acc = Observable.never()
    if isinstance(args[0], list):
        items = args[0]
    else:
        items = list(args)

    def func(previous, current):
        return previous.amb(current)

    for item in items:
        acc = func(acc, item)

    return acc