import logging
from rx import AnonymousObservable, Observable
from rx.internal.utils import add_ref
from rx.internal import noop
from rx.disposables import SingleAssignmentDisposable, SerialDisposable, \
CompositeDisposable, RefCountDisposable
from rx.subjects import Subject
from rx.internal import extensionmethod
log = logging.getLogger("Rx")
@extensionmethod(Observable)
def window(self, window_openings=None, window_closing_selector=None):
"""Projects each element of an observable sequence into zero or more windows.
Keyword arguments:
window_openings (Observable): Observable sequence whose elements
denote the creation of windows.
window_closing_selector (types.FunctionType, optional): A function
invoked to define the closing of each produced window. It defines the
boundaries of the produced windows (a window is started when the
previous one is closed, resulting in non-overlapping windows).
Returns:
(Observable): An observable sequence of windows.
"""
# Make it possible to call window with a single unnamed parameter
if not isinstance(window_openings, Observable) and callable(window_openings):
window_closing_selector = window_openings
window_openings = None
if window_openings and not window_closing_selector:
return observable_window_with_bounaries(self, window_openings)
if not window_openings and window_closing_selector:
return observable_window_with_closing_selector(self, window_closing_selector)
return observable_window_with_openings(self, window_openings, window_closing_selector)
def observable_window_with_openings(self, window_openings, window_closing_selector):
return window_openings.group_join(self, window_closing_selector, lambda _: Observable.empty(), lambda _, window: window)
def observable_window_with_bounaries(self, window_boundaries):
source = self
def subscribe(observer):
window = [Subject()]
d = CompositeDisposable()
r = RefCountDisposable(d)
observer.on_next(add_ref(window[0], r))
def on_next_window(x):
window[0].on_next(x)
def on_error(err):
window[0].on_error(err)
observer.on_error(err)
def on_completed():
window[0].on_completed()
observer.on_completed()
d.add(source.subscribe(on_next_window, on_error, on_completed))
def on_next_observer(w):
window[0].on_completed()
window[0] = Subject()
observer.on_next(add_ref(window[0], r))
d.add(window_boundaries.subscribe(on_next_observer, on_error, on_completed))
return r
return AnonymousObservable(subscribe)
def observable_window_with_closing_selector(self, window_closing_selector):
source = self
def subscribe(observer):
m = SerialDisposable()
d = CompositeDisposable(m)
r = RefCountDisposable(d)
window = [Subject()]
observer.on_next(add_ref(window[0], r))
def on_next(x):
window[0].on_next(x)
def on_error(ex):
window[0].on_error(ex)
observer.on_error(ex)
def on_completed():
window[0].on_completed()
observer.on_completed()
d.add(source.subscribe(on_next, on_error, on_completed))
def create_window_close():
try:
window_close = window_closing_selector()
except Exception as exception:
log.error("*** Exception: %s" % exception)
observer.on_error(exception)
return
def on_completed():
window[0].on_completed()
window[0] = Subject()
observer.on_next(add_ref(window[0], r))
create_window_close()
m1 = SingleAssignmentDisposable()
m.disposable = m1
m1.disposable = window_close.take(1).subscribe(noop, on_error, on_completed)
create_window_close()
return r
return AnonymousObservable(subscribe)