Source code for rx.linq.observable.publish

from rx import Observable
from rx.subjects import Subject
from rx.internal import extensionmethod


@extensionmethod(Observable)
def publish(self, selector=None):
    """Returns an observable sequence that is the result of invoking the
    selector on a connectable observable sequence that shares a single
    subscription to the underlying sequence. This operator is a
    specialization of Multicast using a regular Subject.

    Example:
    res = source.publish()
    res = source.publish(lambda x: x)

    selector -- {Function} [Optional] Selector function which can use the
        multicasted source sequence as many times as needed, without causing
        multiple subscriptions to the source sequence. Subscribers to the
        given source will receive all notifications of the source from the
        time of the subscription on.

    Returns an observable {Observable} sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    selector function."""

    if selector:
        return self.multicast(subject_selector=lambda: Subject(), selector=selector)
    else:
        return self.multicast(subject=Subject())


@extensionmethod(Observable)
def share(self):
    """Share a single subscription among multple observers.

    Returns a new Observable that multicasts (shares) the original
    Observable. As long as there is at least one Subscriber this
    Observable will be subscribed and emitting data. When all
    subscribers have unsubscribed it will unsubscribe from the source
    Observable.

    This is an alias for Observable.publish().ref_count().
    """
    return self.publish().ref_count()