.. include:: operator-aliases.rst ********* Operators ********* Introduction ============ Chaining Operators ------------------ Each language-specific implementation of ReactiveX implements a set of operators. These are the operators specific to the Python implementation (RxPY). Most operators operate on an Observable and return an Observable. This allows you to apply these operators one after the other, in a chain. Each operator in the chain modifies the Observable that results from the operation of the previous operator. A chain of Observable operators do not operate independently on the original Observable that originates the chain, but they operate in turn, each one operating on the Observable generated by the operator immediately previous in the chain. The Operators of RxPY --------------------- This page first lists what could be considered the core operators in ReactiveX, and links to pages that have more in-depth information on how these operators work and how particular language-specific ReactiveX versions have implemented these operators. Next is a decision tree that may help you choose the operator that is most appropriate to your use case. Finally, there is an alphabetical list of the operators available in RxPY. Other implementations of ReactiveX may implement different operators. Operators By Category ===================== Creating Observables -------------------- Operators that originate new Observables. * |create| -- create an Observable from scratch by calling observer methods programmatically * |defer| — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer * |empty|/|never|/|throw| — create Observables that have very precise and limited behavior * |from_| — convert some other object or data structure into an Observable * interval — create an Observable that emits a sequence of integers spaced by a particular time interval * |just| — convert an object or a set of objects into an Observable that emits that or those objects * |range| — create an Observable that emits a range of sequential integers * |repeat| — create an Observable that emits a particular item or sequence of items repeatedly * |start| — create an Observable that emits the return value of a function * |timer| — create an Observable that emits a single item after a given delay Transforming Observables ------------------------ Operators that transform items that are emitted by an Observable. * |buffer| — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time * |flat_map| — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable * |group_by| — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key * |map| — transform the items emitted by an Observable by applying a function to each item * |scan| — apply a function to each item emitted by an Observable, sequentially, and emit each successive value * |window| — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time Filtering Observables --------------------- Operators that selectively emit items from a source Observable. * |debounce| — only emit an item from an Observable if a particular timespan has passed without it emitting another item * |distinct| — suppress duplicate items emitted by an Observable * |element_at| — emit only item n emitted by an Observable * |filter| — emit only those items from an Observable that pass a predicate test * |first| — emit only the first item, or the first item that meets a condition, from an Observable * |ignore_elements| — do not emit any items from an Observable but mirror its termination notification * |last| — emit only the last item emitted by an Observable * |sample| — emit the most recent item emitted by an Observable within periodic time intervals * |skip| — suppress the first n items emitted by an Observable * |skip_last| — suppress the last n items emitted by an Observable * |take| — emit only the first n items emitted by an Observable * |take_last| — emit only the last n items emitted by an Observable Combining Observables --------------------- Operators that work with multiple source Observables to create a single Observable * |and_|/|then|/|when| — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries * |combine_latest| — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function * |join| — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable * |merge| — combine multiple Observables into one by merging their emissions * |start_with| — emit a specified sequence of items before beginning to emit the items from the source Observable * |switch_latest|/ — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables * |zip| — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function Error Handling Operators ------------------------ Operators that help to recover from error notifications from an Observable * |catch_exception| — recover from an onError notification by continuing the sequence without error * |retry| — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error Observable Utility Operators ---------------------------- A toolbox of useful Operators for working with Observables * |delay| — shift the emissions from an Observable forward in time by a particular amount * |do_action| — register an action to take upon a variety of Observable lifecycle events * |materialize|/|dematerialize| — represent both the items emitted and the notifications sent as emitted items, or reverse this process * |observe_on| — specify the scheduler on which an observer will observe this Observable * |subscribe| — operate upon the emissions and notifications from an Observable * |subscribe_on| — specify the scheduler an Observable should use when it is subscribed to * |time_interval| — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions * |timeout| — mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items * |timestamp| — attach a timestamp to each item emitted by an Observable * |using| — create a disposable resource that has the same lifespan as the Observable Conditional and Boolean Operators --------------------------------- Operators that evaluate one or more Observables or items emitted by Observables * |all| — determine whether all items emitted by an Observable meet some criteria * |amb| — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item * |contains| — determine whether an Observable emits a particular item or not * |default_if_empty| — emit items from the source Observable, or a default item if the source Observable emits nothing * |sequence_equal| — determine whether two Observables emit the same sequence of items * |skip_until| — discard items emitted by an Observable until a second Observable emits an item * |skip_while| — discard items emitted by an Observable until a specified condition becomes false * |take_until| — discard items emitted by an Observable after a second Observable emits an item or terminates * |take_while| — discard items emitted by an Observable after a specified condition becomes false Mathematical and Aggregate Operators ------------------------------------ Operators that operate on the entire sequence of items emitted by an Observable * |average| — calculates the average of numbers emitted by an Observable and emits this average * |concat| — emit the emissions from two or more Observables without interleaving them * |count| — count the number of items emitted by the source Observable and emit only this value * |max| — determine, and emit, the maximum-valued item emitted by an Observable * |min| — determine, and emit, the minimum-valued item emitted by an Observable * |reduce| — apply a function to each item emitted by an Observable, sequentially, and emit the final value * |sum| — calculate the sum of numbers emitted by an Observable and emit this sum Backpressure Operators ---------------------- * *backpressure operators* — strategies for coping with Observables that produce items more rapidly than their observers consume them Connectable Observable Operators -------------------------------- Specialty Observables that have more precisely-controlled subscription dynamics .. todo:: Write documentation for **connect** and **ref_count** .. comment * |connect| — instruct a connectable Observable to begin emitting items to its subscribers * |publish| — convert an ordinary Observable into a connectable Observable .. comment * |ref_count| — make a Connectable Observable behave like an ordinary Observable * |replay| — ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items Operators to Convert Observables -------------------------------- * **to** — convert an Observable into another object or data structure A Decision Tree of Observable Operators ======================================= I want to create a new Observable... * that emits a particular item: |just| * that was returned from a function called at subscribe-time: |start| * that was returned from an Action, Callable, Runnable, or something of that sort, called at subscribe-time: |from_| * after a specified delay: |timer| * that pulls its emissions from a particular Array, Iterable, or something like that: |from_| * by retrieving it from a Future: |start| * that obtains its sequence from a Future: |from_| * that emits a sequence of items repeatedly: |repeat| * from scratch, with custom logic: |create| * for each observer that subscribes: |defer| * that emits a sequence of integers: |range| * at particular intervals of time: |interval| * after a specified delay: |timer| * that completes without emitting items: |empty| * that does nothing at all: |never| An Alphabetical List of Observable Operators ============================================ .. !!!AUTO!!! BEGIN 'Operator Index' * |aggregate| * |all| * |amb| * |and_| * |as_observable| * |average| * |buffer| * |buffer_with_count| * |buffer_with_time| * |buffer_with_time_or_count| * |case| * |catch_exception| * |combine_latest| * |concat| * |concat_all| * |contains| * |controlled| * |count| * |create| * |create_with_disposable| * |debounce| * |default_if_empty| * |defer| * |delay| * |delay_subscription| * |delay_with_selector| * |dematerialize| * |distinct| * |distinct_until_changed| * |do_action| * |do_while| * |element_at| * |element_at_or_default| * |empty| * |every| * |exclusive| * |expand| * |filter| * |finally_action| * |find| * |find_index| * |first| * |first_or_default| * |flat_map| * |flat_map_latest| * |for_in| * |from_| * |from_callback| * |from_future| * |from_iterable| * |from_list| * |from_marbles| * |from_string| * |generate| * |generate_with_relative_time| * |group_by| * |group_by_until| * |group_join| * |if_then| * |ignore_elements| * |interval| * |is_empty| * |join| * |just| * |last| * |last_or_default| * |let| * |let_bind| * |many_select| * |map| * |materialize| * |max| * |max_by| * |merge| * |merge_all| * |merge_observable| * |min| * |min_by| * |multicast| * |never| * |observe_on| * |of| * |on_error_resume_next| * |pairwise| * |partition| * |pausable| * |pausable_buffered| * |pluck| * |pluck_attr| * |publish| * |publish_value| * |range| * |reduce| * |repeat| * |replay| * |retry| * |return_value| * |sample| * |scan| * |select| * |select_many| * |select_switch| * |sequence_equal| * |share| * |single| * |single_or_default| * |skip| * |skip_last| * |skip_last_with_time| * |skip_until| * |skip_until_with_time| * |skip_while| * |skip_with_time| * |slice| * |some| * |start| * |start_async| * |start_with| * |subscribe| * |subscribe_on| * |sum| * |switch_case| * |switch_latest| * |switch_map| * |take| * |take_last| * |take_last_buffer| * |take_last_with_time| * |take_until| * |take_until_with_time| * |take_while| * |take_with_time| * |tap| * |then| * |then_do| * |throttle_first| * |throttle_last| * |throttle_with_selector| * |throttle_with_timeout| * |throw| * |throw_exception| * |time_interval| * |timeout| * |timeout_with_selector| * |timer| * |timestamp| * |to_async| * |to_blocking| * |to_dict| * |to_future| * |to_iterable| * |to_list| * |to_marbles| * |to_set| * |to_string| * |transduce| * |using| * |when| * |where| * |while_do| * |window| * |window_with_count| * |window_with_time| * |window_with_time_or_count| * |with_latest_from| * |zip| * |zip_array| * |zip_list| .. toctree:: :hidden: all amb and-then-when as_observable average buffer case catch combine_latest concat contains controlled count create create_with_disposable debounce default_if_empty delay distinct do do_while element_at empty-never-throw filter first flat_map flat_map_latest from from_marbles from_string group_by ignore_elements interval join just last map materialize-dematerialize max merge min min_by observe_on pausable pausable_buffered pluck pluck_attr publish range reduce repeat replay retry sample scan select sequence_equal share skip skip_last skip_until skip_until_with_time skip_while start start_with subscribe subscribe_on sum switch switch_map take take_last take_last_with_time take_until take_until_with_time take_while then time_interval timeout timer timestamp to to_marbles to_string transduce using while_do window zip .. !!!AUTO!!! END 'Operator Index'