Source code for rx.linq.observable.throw

from rx.core import Observable, AnonymousObservable

from rx.concurrency import immediate_scheduler
from rx.internal import extensionclassmethod


@extensionclassmethod(Observable, alias="throw_exception")
def throw(cls, exception, scheduler=None):
    """Returns an observable sequence that terminates with an exception,
    using the specified scheduler to send out the single OnError message.

    1 - res = rx.Observable.throw_exception(Exception('Error'))
    2 - res = rx.Observable.throw_exception(Exception('Error'),
                                            rx.Scheduler.timeout)

    Keyword arguments:
    exception -- An object used for the sequence's termination.
    scheduler -- Scheduler to send the exceptional termination call on. If
        not specified, defaults to ImmediateScheduler.

    Returns the observable sequence that terminates exceptionally with the
    specified exception object.
    """

    scheduler = scheduler or immediate_scheduler

    exception = exception if type(exception) is Exception else Exception(exception)

    def subscribe(observer):
        def action(scheduler, state):
            observer.on_error(exception)

        return scheduler.schedule(action)
    return AnonymousObservable(subscribe)