Source code for rx.linq.observable.debounce

from rx.core import Observable, AnonymousObservable
from rx.disposables import CompositeDisposable, \
    SingleAssignmentDisposable, SerialDisposable
from rx.concurrency import timeout_scheduler
from rx.internal import extensionmethod


@extensionmethod(Observable, alias="throttle_with_timeout")
def debounce(self, duetime, scheduler=None):
    """Ignores values from an observable sequence which are followed by
    another value before duetime.

    Example::

        res = source.debounce(5000) # 5 seconds
        res = source.debounce(5000, scheduler)

    Args:

        duetime (Number): Duration of the throttle period for each value
            (specified as an integer denoting milliseconds).

    Keyword Arguments:

        scheduler (Scheduler): Scheduler to run the throttle
            timers on. If not specified, the timeout scheduler is used.

    Returns:
        Observable: The debounced sequence.
    """

    scheduler = scheduler or timeout_scheduler
    source = self

    def subscribe(observer):
        cancelable = SerialDisposable()
        has_value = [False]
        value = [None]
        _id = [0]

        def on_next(x):
            has_value[0] = True
            value[0] = x
            _id[0] += 1
            current_id = _id[0]
            d = SingleAssignmentDisposable()
            cancelable.disposable = d

            def action(scheduler, state=None):
                if has_value[0] and _id[0] == current_id:
                    observer.on_next(value[0])
                has_value[0] = False

            d.disposable = scheduler.schedule_relative(duetime, action)

        def on_error(exception):
            cancelable.dispose()
            observer.on_error(exception)
            has_value[0] = False
            _id[0] += 1

        def on_completed():
            cancelable.dispose()
            if has_value[0]:
                observer.on_next(value[0])

            observer.on_completed()
            has_value[0] = False
            _id[0] += 1

        subscription = source.subscribe(on_next, on_error, on_completed)
        return CompositeDisposable(subscription, cancelable)
    return AnonymousObservable(subscribe)


@extensionmethod(Observable)
def throttle_with_selector(self, throttle_duration_selector):
    """Ignores values from an observable sequence which are followed by
    another value within a computed throttle duration.

    Example::
        
        res = source.throttle_with_selector(lambda x: rx.Scheduler.timer(x+x))

    Keyword Arguments:
    
        throttle_duration_selector: Selector function to retrieve a sequence
            indicating the throttle duration for each given element.

    Returns:
        Observable: the throttled sequence.
    """

    source = self

    def subscribe(observer):
        cancelable = SerialDisposable()
        has_value = [False]
        value = [None]
        _id = [0]

        def on_next(x):
            throttle = None
            try:
                throttle = throttle_duration_selector(x)
            except Exception as e:
                observer.on_error(e)
                return

            has_value[0] = True
            value[0] = x
            _id[0] += 1
            current_id = _id[0]
            d = SingleAssignmentDisposable()
            cancelable.disposable = d

            def on_next(x):
                if has_value[0] and _id[0] == current_id:
                    observer.on_next(value[0])

                has_value[0] = False
                d.dispose()

            def on_completed():
                if has_value[0] and _id[0] == current_id:
                    observer.on_next(value[0])

                has_value[0] = False
                d.dispose()

            d.disposable = throttle.subscribe(on_next, observer.on_error,
                                              on_completed)

        def on_error(e):
            cancelable.dispose()
            observer.on_error(e)
            has_value[0] = False
            _id[0] += 1

        def on_completed():
            cancelable.dispose()
            if has_value[0]:
                observer.on_next(value[0])

            observer.on_completed()
            has_value[0] = False
            _id[0] += 1

        subscription = source.subscribe(on_next, on_error, on_completed)
        return CompositeDisposable(subscription, cancelable)
    return AnonymousObservable(subscribe)