Source code for rx.backpressure.pausable


from rx.core import Observable, ObservableBase, Disposable
from rx.internal import extensionmethod
from rx.disposables import CompositeDisposable
from rx.subjects import Subject


class PausableObservable(ObservableBase):
    def __init__(self, source, pauser=None):
        self.source = source
        self.controller = Subject()

        if pauser and hasattr(pauser, "subscribe"):
            self.pauser = self.controller.merge(pauser)
        else:
            self.pauser = self.controller

        super(PausableObservable, self).__init__()

    def _subscribe_core(self, observer):
        conn = self.source.publish()
        subscription = conn.subscribe(observer)
        connection = [Disposable.empty()]

        def on_next(b):
            if b:
                connection[0] = conn.connect()
            else:
                connection[0].dispose()
                connection[0] = Disposable.empty()

        pausable = self.pauser.distinct_until_changed().subscribe(on_next)
        return CompositeDisposable(subscription, connection[0], pausable)

    def pause(self):
        self.controller.on_next(False)

    def resume(self):
        self.controller.on_next(True)


@extensionmethod(Observable)
def pausable(self, pauser):
    """Pauses the underlying observable sequence based upon the observable
    sequence which yields True/False.

    Example:
    pauser = rx.Subject()
    source = rx.Observable.interval(100).pausable(pauser)

    Keyword parameters:
    pauser -- {Observable} The observable sequence used to pause the
        underlying sequence.

    Returns the observable {Observable} sequence which is paused based upon
    the pauser.
    """

    return PausableObservable(self, pauser)