Source code for rx.linq.observable.distinct

from rx.core import Observable, AnonymousObservable
from rx.internal.basic import identity, default_comparer
from rx.internal import extensionmethod

# Swap out for Array.findIndex
def array_index_of_comparer(array, item, comparer):
    for i, a in enumerate(array):
        if comparer(a, item):
            return i
    return -1

class HashSet(object):
    def __init__(self, comparer):
        self.comparer = comparer
        self.set = []

    def push(self, value):
        ret_value = array_index_of_comparer(self.set, value, self.comparer) == -1
        ret_value and self.set.append(value)
        return ret_value


@extensionmethod(Observable)
def distinct(self, key_selector=None, comparer=None):
    """Returns an observable sequence that contains only distinct elements
    according to the key_selector and the comparer. Usage of this operator
    should be considered carefully due to the maintenance of an internal
    lookup structure which can grow large.

    Example:
    res = obs = xs.distinct()
    obs = xs.distinct(lambda x: x.id)
    obs = xs.distinct(lambda x: x.id, lambda a,b: a == b)

    Keyword arguments:
    key_selector -- {Function} [Optional]  A function to compute the
        comparison key for each element.
    comparer -- {Function} [Optional]  Used to compare items in the
        collection.

    Returns an observable {Observable} sequence only containing the distinct
    elements, based on a computed key value, from the source sequence.
    """

    source = self
    comparer = comparer or default_comparer

    def subscribe(observer):
        hashset = HashSet(comparer)

        def on_next(x):
            key = x

            if key_selector:
                try:
                    key = key_selector(x)
                except Exception as ex:
                    observer.on_error(ex)
                    return

            hashset.push(key) and observer.on_next(x)
        return source.subscribe(on_next, observer.on_error,
                                observer.on_completed)
    return AnonymousObservable(subscribe)