Source code for rx.backpressure.pausablebuffered

from rx.core import ObservableBase, Observable, AnonymousObservable
from rx.internal import extensionmethod
from rx.subjects import Subject
from rx.disposables import CompositeDisposable


def combine_latest_source(source, subject, result_selector):
    def subscribe(observer):
        has_value = [False, False]
        has_value_all = [False]
        values = [None, None]
        is_done = [False]
        err = [None]

        def next(x, i):
            has_value[i] = True
            values[i] = x

            has_value_all[0] = has_value_all[0] or all(has_value)
            if has_value_all[0]:
                if err[0]:
                    observer.on_error(err[0])
                    return

                try:
                    res = result_selector(*values)
                except Exception as ex:
                    observer.on_error(ex)
                    return
                observer.on_next(res)
            if is_done[0] and values[1]:
                observer.on_completed()

        def on_error_source(e):
            if values[1]:
                observer.on_error(e)
            else:
                err[0] = e

        def on_completed_source():
            is_done[0] = True
            if values[1]:
                observer.on_completed()

        def on_completed_subject():
            is_done[0] = True
            next(True, 1)

        return CompositeDisposable(
            source.subscribe(lambda x: next(x, 0), on_error_source, on_completed_source),
            subject.subscribe(lambda x: next(x, 1), observer.on_error, on_completed_subject)
        )
    return AnonymousObservable(subscribe)


class PausableBufferedObservable(ObservableBase):

    def __init__(self, source, pauser=None):
        self.source = source
        self.controller = Subject()

        if pauser and hasattr(pauser, "subscribe"):
            self.pauser = self.controller.merge(pauser)
        else:
            self.pauser = self.controller

        super(PausableBufferedObservable, self).__init__()

    def _subscribe_core(self, observer):
        previous_should_fire = [None]
        queue = []

        def result_selector(data, should_fire=False):
            return {"data": data, "should_fire": should_fire}

        def on_next(results):
            should_fire = results.get("should_fire")
            if (not previous_should_fire[0] is None) and should_fire != previous_should_fire[0]:
                previous_should_fire[0] = should_fire
                # change in shouldFire
                if should_fire:
                    while len(queue):
                        b = queue.pop(0)
                        observer.on_next(b)
            else:
                previous_should_fire[0] = should_fire
                # new data
                if should_fire:
                    observer.on_next(results["data"])
                else:
                    queue.append(results["data"])

        def on_error(err):
            # Empty buffer before sending error
            while len(queue):
                observer.on_next(queue.pop(0))
            observer.on_error(err)

        def on_completed():
            # Empty buffer before sending completion
            while len(queue):
                observer.on_next(queue.pop(0))
            observer.on_completed()

        subscription = combine_latest_source(
            self.source,
            self.pauser.distinct_until_changed().start_with(False),
            result_selector
        ).subscribe(on_next, on_error, on_completed)

        return subscription

    def pause(self):
        self.controller.on_next(False)

    def resume(self):
        self.controller.on_next(True)


@extensionmethod(Observable)
def pausable_buffered(self, subject):
    """Pauses the underlying observable sequence based upon the observable
    sequence which yields True/False, and yields the values that were
    buffered while paused.

    Example:
    pauser = rx.Subject()
    source = rx.Observable.interval(100).pausable_buffered(pauser)

    Keyword arguments:
    pauser -- {Observable} The observable sequence used to pause the
        underlying sequence.

    Returns the observable {Observable} sequence which is paused based upon
    the pauser."""

    return PausableBufferedObservable(self, subject)