Source code for rx.linq.observable.range

from rx.core import Observable, AnonymousObservable
from rx.concurrency import current_thread_scheduler
from rx.internal import extensionclassmethod


@extensionclassmethod(Observable)
def range(cls, start, count, scheduler=None):
    """Generates an observable sequence of integral numbers within a
    specified range, using the specified scheduler to send out observer
    messages.

    1 - res = Rx.Observable.range(0, 10)
    2 - res = Rx.Observable.range(0, 10, rx.Scheduler.timeout)

    Keyword arguments:
    start -- The value of the first integer in the sequence.
    count -- The number of sequential integers to generate.
    scheduler -- [Optional] Scheduler to run the generator loop on. If not
        specified, defaults to Scheduler.current_thread.

    Returns an observable sequence that contains a range of sequential
    integral numbers.
    """

    scheduler = scheduler or current_thread_scheduler

    def subscribe(observer):
        def action(recurse, i):
            if i < count:
                observer.on_next(start + i)
                recurse(i + 1)
            else:
                observer.on_completed()

        return scheduler.schedule_recursive(action, 0)
    return AnonymousObservable(subscribe)