Source code for rx.linq.observable.timeinterval

from rx.core import Observable
from rx.concurrency import timeout_scheduler
from rx.internal.utils import TimeInterval
from rx.internal import extensionmethod


@extensionmethod(Observable)
def time_interval(self, scheduler):
    """Records the time interval between consecutive values in an
    observable sequence.

    1 - res = source.time_interval();
    2 - res = source.time_interval(Scheduler.timeout)

    Keyword arguments:
    scheduler -- [Optional] Scheduler used to compute time intervals. If
        not specified, the timeout scheduler is used.

    Return An observable sequence with time interval information on values.
    """

    source = self
    scheduler = scheduler or timeout_scheduler

    def defer():
        last = [scheduler.now]

        def selector(x):
            now = scheduler.now
            span = now - last[0]
            last[0] = now
            return TimeInterval(value=x, interval=span)

        return source.map(selector)
    return Observable.defer(defer)