Source code for rx.linq.observable.sample

from rx.core import Observable, AnonymousObservable
from rx.disposables import CompositeDisposable
from rx.concurrency import timeout_scheduler
from rx.internal import extensionmethod


def sample_observable(source, sampler):

    def subscribe(observer):
        at_end = [None]
        has_value = [None]
        value = [None]

        def sample_subscribe(x=None):
            if has_value[0]:
                has_value[0] = False
                observer.on_next(value[0])

            if at_end[0]:
                observer.on_completed()

        def on_next(new_value):
            has_value[0] = True
            value[0] = new_value

        def on_completed():
            at_end[0] = True

        return CompositeDisposable(
            source.subscribe(on_next, observer.on_error, on_completed),
            sampler.subscribe(sample_subscribe, observer.on_error, sample_subscribe)
        )
    return AnonymousObservable(subscribe)


@extensionmethod(Observable, alias="throttle_last")
def sample(self, interval=None, sampler=None, scheduler=None):
    """Samples the observable sequence at each interval.

    Examples::

        # Sampler tick sequence
        res = source.sample(sample_observable)
        # 5 seconds (5000 milliseconds)
        res = source.sample(5000)
        # 5 seconds
        res = source.sample(5000, rx.scheduler.timeout)

    Arguments:

      source (Observable): Source sequence to sample.
      
    Keyword Arguments:
      interval (int): Interval at which to sample (specified as an integer
        denoting milliseconds).
      scheduler (Secheduler): Scheduler to run the sampling timer on. If not
        specified, the timeout scheduler is used.

    Returns:
      (Observable): sampled observable sequence.
    """

    scheduler = scheduler or timeout_scheduler
    if interval is not None:
        return sample_observable(self, Observable.interval(interval, scheduler=scheduler))

    return sample_observable(self, sampler)