Source code for rx.linq.observable.takeuntilwithtime

from datetime import datetime

from rx.core import Observable, AnonymousObservable
from rx.disposables import CompositeDisposable
from rx.internal import extensionmethod
from rx.concurrency import timeout_scheduler

@extensionmethod(Observable)
def take_until_with_time(self, end_time, scheduler=None):
    """Takes elements for the specified duration until the specified end
    time, using the specified scheduler to run timers.

    Examples:
    1 - res = source.take_until_with_time(dt, [optional scheduler])
    2 - res = source.take_until_with_time(5000, [optional scheduler])

    Keyword Arguments:
    end_time -- {Number | Date} Time to stop taking elements from the source
        sequence. If this value is less than or equal to Date(), the
        result stream will complete immediately.
    scheduler -- {Scheduler} Scheduler to run the timer on.

    Returns an observable {Observable} sequence with the elements taken
    until the specified end time.
    """

    scheduler = scheduler or timeout_scheduler
    source = self

    if isinstance(end_time, datetime):
        scheduler_method = scheduler.schedule_absolute
    else:
        scheduler_method = scheduler.schedule_relative

    def subscribe(observer):
        def action(scheduler, state):
            observer.on_completed()

        task = scheduler_method(end_time, action)
        return CompositeDisposable(task,  source.subscribe(observer))
    return AnonymousObservable(subscribe)