Source code for rx.linq.observable.generatewithrelativetime

from rx.core import Observable, AnonymousObservable
from rx.concurrency import timeout_scheduler
from rx.internal import extensionclassmethod


@extensionclassmethod(Observable)
def generate_with_relative_time(cls, initial_state, condition, iterate,
                                result_selector, time_selector,
                                scheduler=None):
    """Generates an observable sequence by iterating a state from an
    initial state until the condition fails.

    res = source.generate_with_relative_time(0,
        lambda x: True,
        lambda x: x + 1,
        lambda x: x,
        lambda x: 500)

    initial_state -- Initial state.
    condition -- Condition to terminate generation (upon returning false).
    iterate -- Iteration step function.
    result_selector -- Selector function for results produced in the
        sequence.
    time_selector -- Time selector function to control the speed of values
        being produced each iteration, returning integer values denoting
        milliseconds.
    scheduler -- [Optional] Scheduler on which to run the generator loop.
        If not specified, the timeout scheduler is used.

    Returns the generated sequence.
    """

    scheduler = scheduler or timeout_scheduler

    def subscribe(observer):
        state = [initial_state]
        has_result = [False]
        result = [None]
        first = [True]
        time = None

        def action(this):
            if has_result[0]:
                observer.on_next(result[0])

            try:
                if first[0]:
                    first[0] = False
                else:
                    state[0] = iterate(state[0])

                has_result[0] = condition(state[0])
                if has_result[0]:
                    result[0] = result_selector(state[0])
                    time = time_selector(state[0])

            except Exception as e:
                observer.on_error(e)
                return

            if has_result[0]:
                this(time)
            else:
                observer.on_completed()

        return scheduler.schedule_recursive_with_relative(0, action)
    return AnonymousObservable(subscribe)