Source code for rx.linq.observable.take

from rx import Observable, AnonymousObservable
from rx.internal import ArgumentOutOfRangeException
from rx.internal import extensionmethod


@extensionmethod(Observable)
def take(self, count, scheduler=None):
    """Returns a specified number of contiguous elements from the start of
    an observable sequence, using the specified scheduler for the edge case
    of take(0).

    1 - source.take(5)
    2 - source.take(0, rx.Scheduler.timeout)

    Keyword arguments:
    count -- The number of elements to return.
    scheduler -- [Optional] Scheduler used to produce an OnCompleted
        message in case count is set to 0.

    Returns an observable sequence that contains the specified number of
    elements from the start of the input sequence.
    """

    if count < 0:
        raise ArgumentOutOfRangeException()

    if not count:
        return Observable.empty(scheduler)

    observable = self
    def subscribe(observer):
        remaining = [count]

        def on_next(value):
            if remaining[0] > 0:
                remaining[0] -= 1
                observer.on_next(value)
                if not remaining[0]:
                    observer.on_completed()

        return observable.subscribe(on_next, observer.on_error, observer.on_completed)
    return AnonymousObservable(subscribe)