Source code for rx.linq.observable.slice

from rx import Observable
from rx.internal import extensionmethod


@extensionmethod(Observable, name="slice")
def slice_(self, start=None, stop=None, step=1):
    """Slices the given observable. It is basically a wrapper around the
    operators skip(), skip_last(), take(), take_last() and filter().

    This marble diagram helps you remember how slices works with streams.
    Positive numbers is relative to the start of the events, while negative
    numbers are relative to the end (on_completed) of the stream.

    r---e---a---c---t---i---v---e---|
    0   1   2   3   4   5   6   7   8
   -8  -7  -6  -5  -4  -3  -2  -1

    Example:
    result = source.slice(1, 10)
    result = source.slice(1, -2)
    result = source.slice(1, -1, 2)

    Keyword arguments:
    :param Observable self: Observable to slice
    :param int start: Number of elements to skip of take last
    :param int stop: Last element to take of skip last
    :param int step: Takes every step element. Must be larger than zero

    :returns: Returns a sliced observable sequence.
    :rtype: Observable
    """

    source = self

    if start is not None:
        if start < 0:
            source = source.take_last(abs(start))
        else:
            source = source.skip(start)

    if stop is not None:
        if stop > 0:
            start = start or 0
            source = source.take(stop - start)
        else:
            source = source.skip_last(abs(stop))

    if step is not None:
        if step > 1:
            source = source.filter(lambda x, i: i % step == 0)
        elif step < 0:
            # Reversing events is not supported
            raise TypeError("Negative step not supported.")

    return source


@extensionmethod(Observable)
def __getitem__(self, key):
    """Slices the given observable using Python slice notation. The
    arguments to slice is start, stop and step given within brackets [] and
    separated with the ':' character. It is basically a wrapper around the
    operators skip(), skip_last(), take(), take_last() and filter().

    This marble diagram helps you remember how slices works with streams.
    Positive numbers is relative to the start of the events, while negative
    numbers are relative to the end (on_completed) of the stream.

    r---e---a---c---t---i---v---e---|
    0   1   2   3   4   5   6   7   8
   -8  -7  -6  -5  -4  -3  -2  -1

    Example:
    result = source[1:10]
    result = source[1:-2]
    result = source[1:-1:2]

    Keyword arguments:
    :param Observable self: Observable to slice
    :param slice key: Slice object

    :returns: A sliced observable sequence.
    :rtype: Observable
    :raises TypeError: If key is not of type int or slice
    """

    if isinstance(key, slice):
        start, stop, step = key.start, key.stop, key.step
    elif isinstance(key, int):
        start, stop, step = key, key + 1, 1
    else:
        raise TypeError("Invalid argument type.")

    return self.slice(start, stop, step)