Buffer¶
See also
- Official ReactiveX documentation: Buffer
-
Observable.
buffer
(buffer_openings=None, closing_selector=None, buffer_closing_selector=None)¶ Projects each element of an observable sequence into zero or more buffers.
Keyword arguments: buffer_openings – Observable sequence whose elements denote the
creation of windows.- closing_selector – Or, a function invoked to define the boundaries of
- the produced windows (a window is started when the previous one is closed, resulting in non-overlapping windows).
- buffer_closing_selector – [optional] A function invoked to define the
- closing of each produced window. If a closing selector function is specified for the first parameter, self parameter is ignored.
Returns an observable sequence of windows.
-
Observable.
buffer_with_count
(count, skip=None)¶ Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
Example: res = xs.buffer_with_count(10) res = xs.buffer_with_count(10, 1)
Keyword parameters: count – {Number} Length of each buffer. skip – {Number} [Optional] Number of elements to skip between creation
of consecutive buffers. If not provided, defaults to the count.Returns an observable {Observable} sequence of buffers.
>>> import rx >>> items = ['red', 'yellow', 'green', 'cyan', 'blue', 'purple'] >>> source = rx.Observable.from_(items).buffer_with_count(3); >>> subscription = source.subscribe( ... lambda value: print("Next:", value), ... lambda error: print("Error:", error), ... lambda: print("Complete!") Next: ['red', 'yellow', 'green'] Next: ['cyan', 'blue', 'purple'] Complete!
>>> import rx >>> items = ['red', 'yellow', 'green', 'cyan', 'blue', 'purple'] >>> source = rx.Observable.from_(items).buffer_with_count(2, skip=3); >>> subscription = source.subscribe( ... lambda value: print("Next:", value), ... lambda error: print("Error:", error), ... lambda: print("Complete!") Next: ['red', 'yellow'] Next: ['cyan', 'blue'] Complete!
-
Observable.
buffer_with_time
(timespan, timeshift=None, scheduler=None)¶ Projects each element of an observable sequence into zero or more buffers which are produced based on timing information.
# non-overlapping segments of 1 second 1 - res = xs.buffer_with_time(1000) # segments of 1 second with time shift 0.5 seconds 2 - res = xs.buffer_with_time(1000, 500)
Keyword arguments: timespan – Length of each buffer (specified as an integer denoting
milliseconds).- timeshift – [Optional] Interval between creation of consecutive
- buffers (specified as an integer denoting milliseconds), or an optional scheduler parameter. If not specified, the time shift corresponds to the timespan parameter, resulting in non-overlapping adjacent buffers.
- scheduler – [Optional] Scheduler to run buffer timers on. If not
- specified, the timeout scheduler is used.
Returns an observable sequence of buffers.
>>> import rx >>> ryg = ['red', 'yellow', 'green'] >>> cbp = ['cyan', 'blue', 'purple'] >>> xs = rx.Observable.timer(10, 50).take(3).map(lambda i: ryg[i]) >>> ys = rx.Observable.timer(60, 50).take(3).map(lambda i: cbp[i]) >>> source = xs.concat(ys).buffer_with_time(timespan=100) >>> subscription = source.subscribe( ... lambda value: print("Next:", value), ... lambda error: print("Error:", error), ... lambda: print("Complete!") Next: ['red', 'yellow'] Next: ['green', 'cyan'] Next: ['blue', 'purple'] Complete!
>>> import rx >>> rygc = ['red', 'yellow', 'green', 'cyan'] >>> bp = ['blue', 'purple'] >>> xs = rx.Observable.timer(30, 50).take(4).map(lambda i: rygc[i]) >>> ys = rx.Observable.timer(80, 50).take(2).map(lambda i: bp[i]) >>> source = xs.concat(ys).buffer_with_time(timeshift=110, timespan=60) >>> subscription = source.subscribe( ... lambda value: print("Next:", value), ... lambda error: print("Error:", error), ... lambda: print("Complete!") Next: ['red'] Next: ['green', 'cyan'] Next: ['blue'] Complete!
-
Observable.
buffer_with_time_or_count
(timespan, count, scheduler)¶ Projects each element of an observable sequence into a buffer that is completed when either it’s full or a given amount of time has elapsed.
# 5s or 50 items in an array 1 - res = source.buffer_with_time_or_count(5000, 50) # 5s or 50 items in an array 2 - res = source.buffer_with_time_or_count(5000, 50, Scheduler.timeout)
Keyword arguments: timespan – Maximum time length of a buffer. count – Maximum element count of a buffer. scheduler – [Optional] Scheduler to run bufferin timers on. If not
specified, the timeout scheduler is used.Returns an observable sequence of buffers.
>>> import rx >>> from rx.core import Scheduler >>> ryg = ['red', 'yellow', 'green'] >>> cbp = ['cyan', 'blue', 'purple'] >>> xs = rx.Observable.timer(10, 20).take(3).map(lambda i: ryg[i]) >>> ys = rx.Observable.timer(40, 30).take(3).map(lambda i: cbp[i]) >>> source = xs.concat(ys) \ ... .buffer_with_time_or_count(timespan=40, ... count=2, ... scheduler=Scheduler.timeout) >>> subscription = source.subscribe( ... lambda value: print("Next:", value), ... lambda error: print("Error:", error), ... lambda: print("Complete!") Next: ['red', 'yellow'] Next: ['green'] Next: ['cyan', 'blue'] Next: ['purple'] Complete!
-
Observable.
pairwise
()¶ Returns a new observable that triggers on the second and subsequent triggerings of the input observable. The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair. The argument passed to the N-1th triggering is held in hidden internal state until the Nth triggering occurs.
Returns an observable {Observable} that triggers on successive pairs of observations from the input observable as an array.