Source code for rx.linq.observable.timeoutwithselector

from rx.core import Observable, AnonymousObservable
from rx.disposables import CompositeDisposable, \
    SingleAssignmentDisposable, SerialDisposable
from rx.internal import extensionmethod


@extensionmethod(Observable)
def timeout_with_selector(self, first_timeout=None,
                          timeout_duration_selector=None, other=None):
    """Returns the source observable sequence, switching to the other
    observable sequence if a timeout is signaled.

    Examples::

        res = source.timeout_with_selector(rx.Observable.timer(500))
        res = source.timeout_with_selector(rx.Observable.timer(500),
                lambda x: rx.Observable.timer(200))
        res = source.timeout_with_selector(rx.Observable.timer(500),
                lambda x: rx.Observable.timer(200)),
                rx.Observable.return_value(42))

    Keyword Arguments:

      first_timeout (Observable): Observable sequence that represents the
        timeout for the first element. If not provided, this defaults to
        :func:`Observable.never`.
      timeout_duration_selector: Selector to retrieve an
        observable sequence that represents the timeout between the current
        element and the next element.
      other: Sequence to return in case of a timeout. If not
        provided, this is set to :func:`Observable.throw_exception`.

    Returns:
        (Observable): Returns the source sequence switching to the other
        sequence in case of a timeout.
    """

    first_timeout = first_timeout or Observable.never()
    other = other or Observable.throw_exception(Exception('Timeout'))
    source = self

    def subscribe(observer):
        subscription = SerialDisposable()
        timer = SerialDisposable()
        original = SingleAssignmentDisposable()

        subscription.disposable = original

        switched = False
        _id = [0]

        def set_timer(timeout):
            my_id = _id[0]

            def timer_wins():
                return _id[0] == my_id

            d = SingleAssignmentDisposable()
            timer.disposable = d

            def on_next(x):
                if timer_wins():
                    subscription.disposable = other.subscribe(observer)

                d.dispose()

            def on_error(e):
                if timer_wins():
                    observer.on_error(e)

            def on_completed():
                if timer_wins():
                    subscription.disposable = other.subscribe(observer)

            d.disposable = timeout.subscribe(on_next, on_error, on_completed)

        set_timer(first_timeout)

        def observer_wins():
            res = not switched
            if res:
                _id[0] += 1

            return res

        def on_next(x):
            if observer_wins():
                observer.on_next(x)
                timeout = None
                try:
                    timeout = timeout_duration_selector(x)
                except Exception as e:
                    observer.on_error(e)
                    return

                set_timer(timeout)

        def on_error(e):
            if observer_wins():
                observer.on_error(e)

        def on_completed():
            if observer_wins():
                observer.on_completed()

        original.disposable = source.subscribe(on_next, on_error, on_completed)
        return CompositeDisposable(subscription, timer)
    return AnonymousObservable(subscribe)