Source code for rx.backpressure.controlledobservable

from rx.core import Observable, ObservableBase
from rx.internal import extensionmethod

from .controlledsubject import ControlledSubject


class ControlledObservable(ObservableBase):

    def __init__(self, source, enable_queue, scheduler=None):
        super(ControlledObservable, self).__init__()

        self.subject = ControlledSubject(enable_queue, scheduler)
        self.source = source.multicast(self.subject).ref_count()

    def _subscribe_core(self, observer):
        return self.source.subscribe(observer)

    def request(self, number_of_items):
        if number_of_items is None:
            number_of_items = -1
        return self.subject.request(number_of_items)


@extensionmethod(Observable)
def controlled(self, enable_queue=True, scheduler=None):
    """Attach a controller to the observable sequence

    Attach a controller to the observable sequence with the ability to
    queue.

    Example:
    source = rx.Observable.interval(100).controlled()
    source.request(3) # Reads 3 values

    Keyword arguments:
    :param bool enable_queue: truthy value to determine if values should
        be queued pending the next request
    :param Scheduler scheduler: determines how the requests will be scheduled
    :returns: The observable sequence which only propagates values on request.
    :rtype: Observable
    """

    return ControlledObservable(self, enable_queue, scheduler)