Source code for rx.linq.observable.distinctuntilchanged

from rx.core import Observable, AnonymousObservable
from rx.internal.basic import identity, default_comparer
from rx.internal import extensionmethod


@extensionmethod(Observable)
def distinct_until_changed(self, key_selector=None, comparer=None):
    """Returns an observable sequence that contains only distinct
    contiguous elements according to the key_selector and the comparer.

    1 - obs = observable.distinct_until_changed();
    2 - obs = observable.distinct_until_changed(lambda x: x.id)
    3 - obs = observable.distinct_until_changed(lambda x: x.id,
                                                lambda x, y: x == y)

    key_selector -- [Optional] A function to compute the comparison key for
        each element. If not provided, it projects the value.
    comparer -- [Optional] Equality comparer for computed key values. If
        not provided, defaults to an equality comparer function.

    Return An observable sequence only containing the distinct contiguous
    elements, based on a computed key value, from the source sequence.
    """

    source = self
    key_selector = key_selector or identity
    comparer = comparer or default_comparer

    def subscribe(observer):
        has_current_key = [False]
        current_key = [None]

        def on_next(value):
            comparer_equals = False
            try:
                key = key_selector(value)
            except Exception as exception:
                observer.on_error(exception)
                return

            if has_current_key[0]:
                try:
                    comparer_equals = comparer(current_key[0], key)
                except Exception as exception:
                    observer.on_error(exception)
                    return

            if not has_current_key[0] or not comparer_equals:
                has_current_key[0] = True
                current_key[0] = key
                observer.on_next(value)

        return source.subscribe(on_next, observer.on_error, observer.on_completed)
    return AnonymousObservable(subscribe)