Source code for rx.linq.observable.windowwithcount

import logging

from rx import AnonymousObservable, Observable
from rx.internal.utils import add_ref
from rx.disposables import SingleAssignmentDisposable, RefCountDisposable
from rx.internal.exceptions import ArgumentOutOfRangeException
from rx.subjects import Subject
from rx.internal import extensionmethod

log = logging.getLogger("Rx")


@extensionmethod(Observable)
def window_with_count(self, count, skip=None):
    """Projects each element of an observable sequence into zero or more
    windows which are produced based on element count information.

    Example::

        xs.window_with_count(10)
        xs.window_with_count(10, 1)

    Arguments:
      count (int): Length of each window.
      skip (int, optional) Number of elements to skip between creation of
        consecutive windows. If not specified, defaults to the count.

    Returns:
      Observable: an observable sequence of windows.
    """

    source = self
    if count <= 0:
        raise ArgumentOutOfRangeException()

    if skip is None:
        skip = count

    if skip <= 0:
        raise ArgumentOutOfRangeException()

    def subscribe(observer):
        m = SingleAssignmentDisposable()
        refCountDisposable = RefCountDisposable(m)
        n = [0]
        q = []

        def create_window():
            s = Subject()
            q.append(s)
            observer.on_next(add_ref(s, refCountDisposable))

        create_window()

        def on_next(x):
            for item in q:
                item.on_next(x)

            c = n[0] - count + 1
            if c >= 0 and c % skip == 0:
                s = q.pop(0);
                s.on_completed()

            n[0] += 1
            if (n[0] % skip) == 0:
                create_window()

        def on_error(exception):
            while len(q):
                q.pop(0).on_error(exception)
            observer.on_error(exception)

        def on_completed():
            while len(q):
                q.pop(0).on_completed()
            observer.on_completed()

        m.disposable = source.subscribe(on_next, on_error, on_completed)
        return refCountDisposable
    return AnonymousObservable(subscribe)