Source code for rx.linq.observable.multicast

from rx import Observable, AnonymousObservable
from rx.linq.connectableobservable import ConnectableObservable
from rx.disposables import CompositeDisposable
from rx.internal import extensionmethod


@extensionmethod(Observable)
def multicast(self, subject=None, subject_selector=None, selector=None):
    """Multicasts the source sequence notifications through an instantiated
    subject into all uses of the sequence within a selector function. Each
    subscription to the resulting sequence causes a separate multicast
    invocation, exposing the sequence resulting from the selector function's
    invocation. For specializations with fixed subject types, see Publish,
    PublishLast, and Replay.

    Example:
    1 - res = source.multicast(observable)
    2 - res = source.multicast(subject_selector=lambda: Subject(),
                               selector=lambda x: x)

    Keyword arguments:
    subject_selector -- {Function} Factory function to create an
        intermediate subject through which the source sequence's elements
        will be multicast to the selector function.
    subject -- Subject {Subject} to push source elements into.
    selector -- {Function} [Optional] Optional selector function which can
        use the multicasted source sequence subject to the policies enforced
        by the created subject. Specified only if subject_selector" is a
        factory function.

    Returns an observable {Observable} sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    selector function.
    """

    source = self
    if subject_selector:
        def subscribe(observer):
            connectable = source.multicast(subject=subject_selector())
            return CompositeDisposable(selector(connectable).subscribe(observer), connectable.connect())

        return AnonymousObservable(subscribe)
    else:
        return ConnectableObservable(source, subject)