Source code for rx.linq.observable.timer

import logging
from datetime import datetime

from rx.core import Scheduler, Observable, AnonymousObservable
from rx.concurrency import timeout_scheduler
from rx.internal import extensionclassmethod

log = logging.getLogger("Rx")


def observable_timer_date(duetime, scheduler):
    def subscribe(observer):
        def action(scheduler, state):
            observer.on_next(0)
            observer.on_completed()

        return scheduler.schedule_absolute(duetime, action)
    return AnonymousObservable(subscribe)


def observable_timer_date_and_period(duetime, period, scheduler):
    p = scheduler.normalize(period)

    def subscribe(observer):
        count = [0]
        d = [duetime]

        def action(state):
            if p > 0:
                now = scheduler.now
                d[0] = d[0] + scheduler.to_timedelta(p)
                if d[0] <= now:
                    d[0] = now + scheduler.to_timedelta(p)

            observer.on_next(count[0])
            count[0] += 1
            state(d[0])

        return scheduler.schedule_recursive_with_absolute(d[0], action)
    return AnonymousObservable(subscribe)


def observable_timer_timespan(duetime, scheduler):
    d = scheduler.normalize(duetime)

    def subscribe(observer):
        def action(scheduler, state):
            observer.on_next(0)
            observer.on_completed()

        return scheduler.schedule_relative(d, action)
    return AnonymousObservable(subscribe)


def observable_timer_timespan_and_period(duetime, period, scheduler):
    if duetime == period:
        def subscribe(observer):
            def action(count):
                observer.on_next(count)
                return count + 1

            return scheduler.schedule_periodic(period, action, state=0)
        return AnonymousObservable(subscribe)

    def defer():
        dt = scheduler.now + scheduler.to_timedelta(duetime)
        return observable_timer_date_and_period(dt, period, scheduler)
    return Observable.defer(defer)


@extensionclassmethod(Observable)
def timer(cls, duetime, period=None, scheduler=None):
    """Returns an observable sequence that produces a value after duetime
    has elapsed and then after each period.

    Examples::
    
        from datetime import datetime

        res = Observable.timer(datetime.now())
        res = Observable.timer(datetime.now(), 1000)
        res = Observable.timer(datetime.now(), Scheduler.timeout)
        res = Observable.timer(datetime.now(), 1000, Scheduler.timeout)

        res = Observable.timer(5000)
        res = Observable.timer(5000, 1000)
        res = Observable.timer(5000, scheduler=Scheduler.timeout)
        res = Observable.timer(5000, 1000, Scheduler.timeout)

    Arguments:
      duetime: Absolute (specified as a :func:`datetime.datetime` object) or
        relative time (specified as an integer denoting milliseconds) at which
        to produce the first value.
      
    Keyword Arguments:
      period (int): Period to produce subsequent values (specified as
        an integer denoting milliseconds), or the scheduler to run the
        timer on. If not specified, the resulting timer is not recurring.
      scheduler (Scheduler): Scheduler to run the timer on. If not
        specified, the timeout scheduler is used.

    Returns:
      An observable sequence that produces a value after due time has
      elapsed and then each period.
    """

    log.debug("Observable.timer(duetime=%s, period=%s)", duetime, period)

    scheduler = scheduler or timeout_scheduler

    if isinstance(duetime, datetime) and period is None:
        return observable_timer_date(duetime, scheduler)

    if isinstance(duetime, datetime) and period:
        return observable_timer_date_and_period(duetime, period, scheduler)

    if period is None:
        return observable_timer_timespan(duetime, scheduler)

    return observable_timer_timespan_and_period(duetime, period, scheduler)