Source code for rx.linq.observable.fromiterable

from rx import Lock
from rx.core import Observable, AnonymousObservable
from rx.concurrency import current_thread_scheduler
from rx.internal import extensionclassmethod


@extensionclassmethod(Observable, alias=["from_", "from_list"])
def from_iterable(cls, iterable, scheduler=None):
    """Converts an array to an observable sequence, using an optional
    scheduler to enumerate the array.

    1 - res = rx.Observable.from_iterable([1,2,3])
    2 - res = rx.Observable.from_iterable([1,2,3], rx.Scheduler.timeout)

    Keyword arguments:
    :param Observable cls: Observable class
    :param Scheduler scheduler: [Optional] Scheduler to run the
        enumeration of the input sequence on.

    :returns: The observable sequence whose elements are pulled from the
        given enumerable sequence.
    :rtype: Observable
    """

    scheduler = scheduler or current_thread_scheduler
    lock = Lock()

    def subscribe(observer):
        iterator = iter(iterable)

        def action(action1, state=None):
            try:
                with lock:
                    item = next(iterator)
            except StopIteration:
                observer.on_completed()
            else:
                observer.on_next(item)
                action1(action)

        return scheduler.schedule_recursive(action)
    return AnonymousObservable(subscribe)