Source code for rx.linq.observable.find

from rx.core import Observable, AnonymousObservable
from rx.internal import extensionmethod


def find_value(source, predicate, yield_index):
    def subscribe(observer):
        i = [0]

        def on_next(x):
            should_run = False
            try:
                should_run = predicate(x, i, source)
            except Exception as ex:
                observer.on_error(ex)
                return

            if should_run:
                observer.on_next(i[0] if yield_index else x)
                observer.on_completed()
            else:
                i[0] += 1

        def on_completed():
            observer.on_next(-1 if yield_index else None)
            observer.on_completed()

        return source.subscribe(on_next, observer.on_error, on_completed)
    return AnonymousObservable(subscribe)

@extensionmethod(Observable)
def find(self, predicate):
    """Searches for an element that matches the conditions defined by the
    specified predicate, and returns the first occurrence within the entire
    Observable sequence.

    Keyword arguments:
    predicate -- {Function} The predicate that defines the conditions of the
        element to search for.

    Returns an Observable {Observable} sequence with the first element that
    matches the conditions defined by the specified predicate, if found
    otherwise, None.
    """

    return find_value(self, predicate, False)