from rx.core import Observable, AnonymousObservable
from rx.disposables import CompositeDisposable, SingleAssignmentDisposable
from rx.internal import extensionmethod, extensionclassmethod
def listify_args(*a):
return list(a)
@extensionmethod(Observable, instancemethod=True)
def with_latest_from(self, *args):
"""Merges the specified observable sequences into one observable sequence
by using the selector function only when the source observable sequence
(the instance) produces an element. The other observables can be passed
either as seperate arguments or as a list.
Examples::
obs = observable.with_latest_from(obs1, obs2, obs3,
lambda o1, o2, o3: o1 + o2 + o3)
obs = observable.with_latest_from([obs1, obs2, obs3],
lambda o1, o2, o3: o1 + o2 + o3)
Returns an observable sequence containing the result of combining
elements of the sources using the specified result selector function.
"""
args = list(args)
if args and isinstance(args[0], list):
args = args[0]
args.insert(0, self)
return Observable.with_latest_from(*args)
@extensionclassmethod(Observable)
def with_latest_from(cls, *args):
"""Merges the specified observable sequences into one observable sequence
by using the selector function only when the first observable sequence
produces an element. The observables can be passed either as seperate
arguments or as a list.
Excamples::
obs = Observable.with_latest_from(obs1, obs2, obs3,
lambda o1, o2, o3: o1 + o2 + o3)
obs = Observable.with_latest_from([obs1, obs2, obs3],
lambda o1, o2, o3: o1 + o2 + o3)
Returns an observable sequence containing the result of combining
elements of the sources using the specified result selector function.
"""
if args and isinstance(args[0], list):
args = args[0]
else:
args = list(args)
result_selector = args.pop()
NO_VALUE = object()
def subscribe(observer):
def subscribe_all(parent, *children):
values = [NO_VALUE for _ in children]
def subscribe_child(i, child):
subscription = SingleAssignmentDisposable()
def on_next(value):
with parent.lock:
values[i] = value
subscription.disposable = child.subscribe(
on_next, observer.on_error)
return subscription
parent_subscription = SingleAssignmentDisposable()
def on_next(value):
with parent.lock:
if NO_VALUE not in values:
try:
result = result_selector(value, *values)
except Exception as error:
observer.on_error(error)
else:
observer.on_next(result)
parent_subscription.disposable = parent.subscribe(
on_next, observer.on_error, observer.on_completed)
return listify_args(
parent_subscription,
*(subscribe_child(*a) for a in enumerate(children))
)
return CompositeDisposable(subscribe_all(*args))
return AnonymousObservable(subscribe)