Source code for rx.linq.observable.buffer

from rx.core import Observable
from rx.internal import extensionmethod


@extensionmethod(Observable)
def buffer(self, buffer_openings=None, closing_selector=None, buffer_closing_selector=None):
    """Projects each element of an observable sequence into zero or more
    buffers.

    Keyword arguments:
    buffer_openings -- Observable sequence whose elements denote the
        creation of windows.
    closing_selector -- Or, a function invoked to define the boundaries of
        the produced windows (a window is started when the previous one is
        closed, resulting in non-overlapping windows).
    buffer_closing_selector -- [optional] A function invoked to define the
        closing of each produced window. If a closing selector function is
        specified for the first parameter, self parameter is ignored.

    Returns an observable sequence of windows.
    """

    if buffer_openings and not buffer_closing_selector:
        return self.window(buffer_openings).select_many(lambda item: item.to_iterable())

    if closing_selector:
        return self.window(closing_selector).select_many(lambda item: item.to_iterable())
    else:
        return self.window(closing_selector, buffer_closing_selector).select_many(lambda item: item.to_iterable())


@extensionmethod(Observable)
def buffer_with_count(self, count, skip=None):
    """Projects each element of an observable sequence into zero or more
    buffers which are produced based on element count information.

    Example:
    res = xs.buffer_with_count(10)
    res = xs.buffer_with_count(10, 1)

    Keyword parameters:
    count -- {Number} Length of each buffer.
    skip -- {Number} [Optional] Number of elements to skip between creation
        of consecutive buffers. If not provided, defaults to the count.

    Returns an observable {Observable} sequence of buffers.
    """

    if skip is None:
        skip = count

    def selector(x):
        return x.to_iterable()

    def predicate(x):
        return len(x) > 0

    return self.window_with_count(count, skip).select_many(selector).filter(predicate)