Source code for rx.linq.observable.skiplast

from rx import Observable, AnonymousObservable
from rx.internal import extensionmethod


@extensionmethod(Observable)
def skip_last(self, count):
    """Bypasses a specified number of elements at the end of an observable
    sequence.

    Description:
    This operator accumulates a queue with a length enough to store the
    first `count` elements. As more elements are received, elements are
    taken from the front of the queue and produced on the result sequence.
    This causes elements to be delayed.

    Keyword arguments
    count -- Number of elements to bypass at the end of the source sequence.

    Returns an observable {Observable} sequence containing the source
    sequence elements except for the bypassed ones at the end.
    """

    source = self

    def subscribe(observer):
        q = []

        def on_next(x):
            front = None
            with self.lock:
                q.append(x)
                if len(q) > count:
                    front = q.pop(0)

            if not front is None:
                observer.on_next(front)

        return source.subscribe(on_next, observer.on_error, 
                                observer.on_completed)
    return AnonymousObservable(subscribe)