from rx.core import Observable, AnonymousObservable
from rx.concurrency import current_thread_scheduler
from rx.internal import extensionclassmethod
@extensionclassmethod(Observable)
def generate(cls, initial_state, condition, iterate, result_selector,
scheduler=None):
"""Generates an observable sequence by running a state-driven loop
producing the sequence's elements, using the specified scheduler to
send out observer messages.
1 - res = rx.Observable.generate(0,
lambda x: x < 10,
lambda x: x + 1,
lambda x: x)
2 - res = rx.Observable.generate(0,
lambda x: x < 10,
lambda x: x + 1,
lambda x: x,
Rx.Scheduler.timeout)
Keyword arguments:
initial_state -- Initial state.
condition -- Condition to terminate generation (upon returning False).
iterate -- Iteration step function.
result_selector -- Selector function for results produced in the
sequence.
scheduler -- [Optional] Scheduler on which to run the generator loop.
If not provided, defaults to CurrentThreadScheduler.
Returns the generated sequence.
"""
scheduler = scheduler or current_thread_scheduler
def subscribe(observer):
first = [True]
state = [initial_state]
def action (action1, state1=None):
has_result = False
result = None
try:
if first[0]:
first[0] = False
else:
state[0] = iterate(state[0])
has_result = condition(state[0])
if has_result:
result = result_selector(state[0])
except Exception as exception:
observer.on_error(exception)
return
if has_result:
observer.on_next(result)
action1()
else:
observer.on_completed()
return scheduler.schedule_recursive(action)
return AnonymousObservable(subscribe)