from rx.core import Observable, AnonymousObservable
from rx.disposables import CompositeDisposable, SingleAssignmentDisposable
from rx.internal import extensionmethod, extensionclassmethod
@extensionmethod(Observable, instancemethod=True)
def zip(self, *args):
"""Merges the specified observable sequences into one observable
sequence by using the selector function whenever all of the observable
sequences or an array have produced an element at a corresponding index.
The last element in the arguments must be a function to invoke for each
series of elements at corresponding indexes in the sources.
Examples::
res = obs1.zip(obs2, fn)
res = x1.zip([1,2,3], fn)
Returns an observable sequence containing the result of combining
elements of the sources using the specified result selector function.
"""
parent = self
sources = list(args)
result_selector = sources.pop()
sources.insert(0, parent)
if args and isinstance(args[0], list):
return _zip_list(self, *args)
def subscribe(observer):
n = len(sources)
queues = [[] for _ in range(n)]
is_done = [False] * n
def next(i):
if all([len(q) for q in queues]):
try:
queued_values = [x.pop(0) for x in queues]
res = result_selector(*queued_values)
except Exception as ex:
observer.on_error(ex)
return
observer.on_next(res)
elif all([x for j, x in enumerate(is_done) if j != i]):
observer.on_completed()
def done(i):
is_done[i] = True
if all(is_done):
observer.on_completed()
subscriptions = [None]*n
def func(i):
source = sources[i]
sad = SingleAssignmentDisposable()
source = Observable.from_future(source)
def on_next(x):
queues[i].append(x)
next(i)
sad.disposable = source.subscribe(on_next, observer.on_error, lambda: done(i))
subscriptions[i] = sad
for idx in range(n):
func(idx)
return CompositeDisposable(subscriptions)
return AnonymousObservable(subscribe)
@extensionclassmethod(Observable)
def zip(cls, *args):
"""Merges the specified observable sequences into one observable
sequence by using the selector function whenever all of the observable
sequences have produced an element at a corresponding index.
The last element in the arguments must be a function to invoke for each
series of elements at corresponding indexes in the sources.
Arguments:
args (List[Observable]): Observable sources.
Returns:
Observable: an observable sequence containing the result of
combining elements of the sources using the specified result selector
function.
"""
first = args[0]
return first.zip(*args[1:])
def _zip_list(source, second, result_selector):
first = source
def subscribe(observer):
length = len(second)
index = [0]
def on_next(left):
if index[0] < length:
right = second[index[0]]
index[0] += 1
try:
result = result_selector(left, right)
except Exception as ex:
observer.on_error(ex)
return
observer.on_next(result)
else:
observer.on_completed()
return first.subscribe(on_next, observer.on_error, observer.on_completed)
return AnonymousObservable(subscribe)