Source code for rx.linq.observable.reduce

from rx import Observable
from rx.internal import extensionmethod


@extensionmethod(Observable, alias="aggregate")
def reduce(self, accumulator, seed=None):
    """Applies an accumulator function over an observable sequence,
    returning the result of the aggregation as a single element in the
    result sequence. The specified seed value is used as the initial
    accumulator value.

    For aggregation behavior with incremental intermediate results, see
    Observable.scan.

    Example:
    1 - res = source.reduce(lambda acc, x: acc + x)
    2 - res = source.reduce(lambda acc, x: acc + x, 0)

    Keyword arguments:
    :param types.FunctionType accumulator: An accumulator function to be
        invoked on each element.
    :param T seed: Optional initial accumulator value.

    :returns: An observable sequence containing a single element with the
        final accumulator value.
    :rtype: Observable
    """

    if seed is not None:
        return self.scan(accumulator, seed=seed).start_with(seed).last()
    else:
        return self.scan(accumulator).last()