Buffer

See also

  • Official ReactiveX documentation: Buffer
Observable.buffer(buffer_openings=None, closing_selector=None, buffer_closing_selector=None)

Projects each element of an observable sequence into zero or more buffers.

Keyword arguments: buffer_openings – Observable sequence whose elements denote the

creation of windows.
closing_selector – Or, a function invoked to define the boundaries of
the produced windows (a window is started when the previous one is closed, resulting in non-overlapping windows).
buffer_closing_selector – [optional] A function invoked to define the
closing of each produced window. If a closing selector function is specified for the first parameter, self parameter is ignored.

Returns an observable sequence of windows.

../../_images/buffer1.py.png ../../_images/buffer2.py.png ../../_images/buffer3.png ../../_images/buffer4.png ../../_images/buffer5.png ../../_images/buffer5.s.png ../../_images/buffer6.png ../../_images/buffer6.s.png ../../_images/buffer7.png ../../_images/buffer7.s.png ../../_images/buffer8.py.png
Observable.buffer_with_count(count, skip=None)

Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.

Example: res = xs.buffer_with_count(10) res = xs.buffer_with_count(10, 1)

Keyword parameters: count – {Number} Length of each buffer. skip – {Number} [Optional] Number of elements to skip between creation

of consecutive buffers. If not provided, defaults to the count.

Returns an observable {Observable} sequence of buffers.

../../_images/buffer_with_count3.py.png
>>> import rx
>>> items = ['red', 'yellow', 'green', 'cyan', 'blue', 'purple']
>>> source = rx.Observable.from_(items).buffer_with_count(3);
>>> subscription = source.subscribe(
...     lambda value: print("Next:", value),
...     lambda error: print("Error:", error),
...     lambda: print("Complete!")
Next: ['red', 'yellow', 'green']
Next: ['cyan', 'blue', 'purple']
Complete!
../../_images/buffer_with_count4.py.png
>>> import rx
>>> items = ['red', 'yellow', 'green', 'cyan', 'blue', 'purple']
>>> source = rx.Observable.from_(items).buffer_with_count(2, skip=3);
>>> subscription = source.subscribe(
...     lambda value: print("Next:", value),
...     lambda error: print("Error:", error),
...     lambda: print("Complete!")
Next: ['red', 'yellow']
Next: ['cyan', 'blue']
Complete!
Observable.buffer_with_time(timespan, timeshift=None, scheduler=None)

Projects each element of an observable sequence into zero or more buffers which are produced based on timing information.

# non-overlapping segments of 1 second 1 - res = xs.buffer_with_time(1000) # segments of 1 second with time shift 0.5 seconds 2 - res = xs.buffer_with_time(1000, 500)

Keyword arguments: timespan – Length of each buffer (specified as an integer denoting

milliseconds).
timeshift – [Optional] Interval between creation of consecutive
buffers (specified as an integer denoting milliseconds), or an optional scheduler parameter. If not specified, the time shift corresponds to the timespan parameter, resulting in non-overlapping adjacent buffers.
scheduler – [Optional] Scheduler to run buffer timers on. If not
specified, the timeout scheduler is used.

Returns an observable sequence of buffers.

../../_images/bufferWithTime5.png ../../_images/bufferWithTime7.png ../../_images/buffer_with_time5.py.png
>>> import rx
>>> ryg = ['red', 'yellow', 'green']
>>> cbp = ['cyan', 'blue', 'purple']
>>> xs = rx.Observable.timer(10, 50).take(3).map(lambda i: ryg[i])
>>> ys = rx.Observable.timer(60, 50).take(3).map(lambda i: cbp[i])
>>> source = xs.concat(ys).buffer_with_time(timespan=100)
>>> subscription = source.subscribe(
...     lambda value: print("Next:", value),
...     lambda error: print("Error:", error),
...     lambda: print("Complete!")
Next: ['red', 'yellow']
Next: ['green', 'cyan']
Next: ['blue', 'purple']
Complete!
../../_images/buffer_with_time7.py.png
>>> import rx
>>> rygc = ['red', 'yellow', 'green', 'cyan']
>>> bp = ['blue', 'purple']
>>> xs = rx.Observable.timer(30, 50).take(4).map(lambda i: rygc[i])
>>> ys = rx.Observable.timer(80, 50).take(2).map(lambda i: bp[i])
>>> source = xs.concat(ys).buffer_with_time(timeshift=110, timespan=60)
>>> subscription = source.subscribe(
...     lambda value: print("Next:", value),
...     lambda error: print("Error:", error),
...     lambda: print("Complete!")
Next: ['red']
Next: ['green', 'cyan']
Next: ['blue']
Complete!
Observable.buffer_with_time_or_count(timespan, count, scheduler)

Projects each element of an observable sequence into a buffer that is completed when either it’s full or a given amount of time has elapsed.

# 5s or 50 items in an array 1 - res = source.buffer_with_time_or_count(5000, 50) # 5s or 50 items in an array 2 - res = source.buffer_with_time_or_count(5000, 50, Scheduler.timeout)

Keyword arguments: timespan – Maximum time length of a buffer. count – Maximum element count of a buffer. scheduler – [Optional] Scheduler to run bufferin timers on. If not

specified, the timeout scheduler is used.

Returns an observable sequence of buffers.

../../_images/buffer_with_time_or_count6.py.png
>>> import rx
>>> from rx.core import Scheduler
>>> ryg = ['red', 'yellow', 'green']
>>> cbp = ['cyan', 'blue', 'purple']
>>> xs = rx.Observable.timer(10, 20).take(3).map(lambda i: ryg[i])
>>> ys = rx.Observable.timer(40, 30).take(3).map(lambda i: cbp[i])
>>> source = xs.concat(ys) \
...     .buffer_with_time_or_count(timespan=40,
...                                count=2,
...                                scheduler=Scheduler.timeout)
>>> subscription = source.subscribe(
...     lambda value: print("Next:", value),
...     lambda error: print("Error:", error),
...     lambda: print("Complete!")
Next: ['red', 'yellow']
Next: ['green']
Next: ['cyan', 'blue']
Next: ['purple']
Complete!
Observable.pairwise()

Returns a new observable that triggers on the second and subsequent triggerings of the input observable. The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair. The argument passed to the N-1th triggering is held in hidden internal state until the Nth triggering occurs.

Returns an observable {Observable} that triggers on successive pairs of observations from the input observable as an array.