Source code for rx.linq.observable.publishvalue

from rx import Observable
from rx.subjects import BehaviorSubject
from rx.internal import extensionmethod


@extensionmethod(Observable)
def publish_value(self, initial_value, selector=None):
    """Returns an observable sequence that is the result of invoking the
    selector on a connectable observable sequence that shares a single
    subscription to the underlying sequence and starts with initial_value.

    This operator is a specialization of Multicast using a BehaviorSubject.

    Example:
    res = source.publish_value(42)
    res = source.publish_value(42, lambda x: x.map(lambda y: y * y))

    Keyword arguments:
    initial_value -- {Mixed} Initial value received by observers upon
        subscription.
    selector -- {Function} [Optional] Optional selector function which can
        use the multicasted source sequence as many times as needed, without
        causing multiple subscriptions to the source sequence. Subscribers
        to the given source will receive immediately receive the initial
        value, followed by all notifications of the source from the time of
        the subscription on.

    Returns {Observable} An observable sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    selector function.
    """

    if selector:
        def subject_selector():
            return BehaviorSubject(initial_value)

        return self.multicast(subject_selector=subject_selector,
                              selector=selector)
    else:
        return self.multicast(BehaviorSubject(initial_value))