Source code for rx.linq.observable.minby

from rx import AnonymousObservable, Observable
from rx.internal.basic import default_sub_comparer
from rx.internal import extensionmethod

def extrema_by(source, key_selector, comparer):
    def subscribe(observer):
        has_value = [False]
        last_key = [None]
        list = []

        def on_next(x):
            try:
                key = key_selector(x)
            except Exception as ex:
                observer.on_error(ex)
                return

            comparison = 0;

            if not has_value[0]:
                has_value[0] = True
                last_key[0] = key
            else:
                try:
                    comparison = comparer(key, last_key[0])
                except Exception as ex1:
                    observer.on_error(ex1)
                    return

            if comparison > 0:
                last_key[0] = key
                #list.clear()
                list[:] = []

            if comparison >= 0:
                list.append(x)

        def on_completed():
            observer.on_next(list)
            observer.on_completed()

        return source.subscribe(on_next, observer.on_error, on_completed)
    return AnonymousObservable(subscribe)


@extensionmethod(Observable)
def min_by(self, key_selector, comparer=None):
    """Returns the elements in an observable sequence with the minimum key
    value according to the specified comparer.

    Example
    res = source.min_by(lambda x: x.value)
    res = source.min_by(lambda x: x.value, lambda x, y: x - y)

    Keyword arguments:
    key_selector -- {Function} Key selector function.
    comparer -- {Function} [Optional] Comparer used to compare key values.

    Returns an observable {Observable} sequence containing a list of zero
    or more elements that have a minimum key value.
    """

    comparer = comparer or default_sub_comparer

    return extrema_by(self, key_selector, lambda x, y: comparer(x, y) * -1)