ReactiveX 的每个语言特定实现都实现了一组运算符。尽管实现之间存在很多重叠,但也有一些运算符只在某些实现中实现。此外,每个实现都倾向于将其运算符命名为类似于在该语言的其他上下文中已经熟悉的类似方法。
大多数运算符对 Observable 进行操作并返回 Observable。这使您可以将这些运算符一个接一个地应用,形成一个链。链中的每个运算符都会修改前一个运算符操作产生的 Observable。
还存在其他模式,例如 Builder 模式,其中特定类的各种方法对该类的同一个项目进行操作,通过方法的操作修改该对象。这些模式也允许您以类似的方式对方法进行链式操作。但是,虽然在 Builder 模式中,方法在链中出现的顺序通常无关紧要,但对于 Observable 运算符来说,顺序很重要。
Observable 运算符链不会独立地对产生链的原始 Observable 进行操作,而是依次进行操作,每个运算符都会对链中紧邻其前面的运算符生成的 Observable 进行操作。
此页面首先列出了 ReactiveX 中可能被认为是“核心”的运算符,并链接到提供有关这些运算符工作原理以及特定语言特定 ReactiveX 版本如何实现这些运算符的更深入信息的页面。
接下来是一个“决策树”,它可以帮助您选择最适合您的用例的运算符。
最后,是 ReactiveX 的许多语言特定实现中提供的大多数运算符的字母顺序列表。这些链接到记录最接近语言特定运算符的核心运算符的页面(例如,Rx.NET 的“SelectMany”运算符链接到 FlatMap ReactiveX 运算符的文档,“SelectMany”是 Rx.NET 的实现)。
如果要实现自己的运算符,请参阅 实现自己的运算符.
生成新的 Observable 的运算符。
Create — 通过以编程方式调用观察者方法从头开始创建 ObservableDefer — 直到观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 ObservableEmpty/Never/Throw — 创建具有非常精确和有限行为的 ObservableFrom — 将其他对象或数据结构转换为 ObservableInterval — 创建一个 Observable,它按特定时间间隔发出一系列整数Just — 将对象或一组对象转换为发出该对象或这些对象的 ObservableRange — 创建发出连续整数范围的 ObservableRepeat — 创建一个 Observable,它重复发出特定项目或一系列项目Start — 创建一个 Observable,它发出函数的返回值Timer — 创建一个 Observable,它在给定延迟后发出单个项目转换由 Observable 发出的项目的运算符。
Buffer — 定期将 Observable 中的项目收集到捆绑包中,并发出这些捆绑包,而不是一次发出一个项目FlatMap — 将由 Observable 发出的项目转换为 Observable,然后将这些 Observable 的发出项展平成单个 ObservableGroupBy — 将 Observable 分成一组 Observable,每个 Observable 都发出原始 Observable 中的不同组项目,这些项目按键进行组织Map — 通过对每个项目应用函数来转换由 Observable 发出的项目Scan — 对由 Observable 发出的每个项目依次应用函数,并发出每个后续值Window — 定期将 Observable 中的项目细分为 Observable 窗口,并发出这些窗口,而不是一次发出一个项目选择性地从源 Observable 发出项目的运算符。
Debounce — 仅当经过特定时间段后 Observable 没有发出其他项目时,才从 Observable 发出项目Distinct — 抑制由 Observable 发出的重复项目ElementAt — 仅发出由 Observable 发出的第 n 个项目Filter — 仅发出通过谓词测试的 Observable 中的项目First — 仅发出第一个项目,或第一个满足条件的项目,从 Observable 中发出IgnoreElements — 不从 Observable 发出任何项目,但镜像其终止通知Last — 仅发出由 Observable 发出的最后一个项目Sample — 每隔特定时间间隔发出 Observable 发出的最新项目Skip — 抑制由 Observable 发出的前 n 个项目SkipLast — 抑制由 Observable 发出的最后 n 个项目Take — 仅发出由 Observable 发出的前 n 个项目TakeLast — 仅发出由 Observable 发出的最后 n 个项目使用多个源 Observable 来创建单个 Observable 的运算符
And/Then/When — 通过 Pattern 和 Plan 中介,组合两个或多个 Observable 发出的项目集CombineLatest — 当两个 Observable 中的任何一个发出项目时,通过指定函数组合每个 Observable 发出的最新项目,并根据该函数的结果发出项目Join — 每当一个 Observable 在根据另一个 Observable 发出的项目定义的时间窗口内发出项目时,组合两个 Observable 发出的项目Merge — 通过合并多个 Observable 的发出项将多个 Observable 组合成一个StartWith — 在开始发出源 Observable 中的项目之前,发出指定的项目序列Switch — 将发出 Observable 的 Observable 转换为单个 Observable,该 Observable 发出最近发出的那些 Observable 发出的项目Zip — 通过指定函数将多个 Observable 的发出项组合在一起,并根据该函数的结果为每个组合发出单个项目帮助从 Observable 的错误通知中恢复的运算符
用于处理 Observable 的实用运算符工具箱
Delay — 将 Observable 的发出项在时间上向前移动特定量Do — 注册一个操作,以便在 Observable 的各种生命周期事件发生时执行Materialize/Dematerialize — 将发出的项目和发送的通知都表示为发出的项目,或反转此过程ObserveOn — 指定观察者将观察此 Observable 的调度器Serialize — 强制 Observable 进行序列化调用,并确保其行为良好Subscribe — 对 Observable 的发出项和通知进行操作SubscribeOn — 指定 Observable 在订阅时应使用的调度器TimeInterval — 将发出项目的 Observable 转换为发出这些项目之间经过时间量的指示的 ObservableTimeout — 镜像源 Observable,但在经过特定时间段后没有任何发出项的情况下,发出错误通知Timestamp — 将时间戳附加到由 Observable 发出的每个项目Using — 创建一个与 Observable 具有相同生命周期的可处置资源评估一个或多个 Observable 或由 Observable 发出的项目的运算符
All — 确定由 Observable 发出的所有项目是否满足某些条件Amb — 给定两个或多个源 Observable,仅从最先发出项目的 Observable 中发出所有项目Contains — 确定 Observable 是否发出特定项目DefaultIfEmpty — 发出源 Observable 中的项目,或者如果源 Observable 没有发出任何项目,则发出默认项目SequenceEqual — 确定两个 Observable 是否发出相同的项目序列SkipUntil — 在第二个 Observable 发出项目之前,丢弃由 Observable 发出的项目SkipWhile — 在指定条件变为假之前,丢弃由 Observable 发出的项目TakeUntil — 在第二个 Observable 发射项目或终止后,丢弃由 Observable 发射的项目TakeWhile — 在指定条件变为 false 后,丢弃由 Observable 发射的项目对 Observable 发射的整个项目序列进行操作的运算符
Average — 计算由 Observable 发射的数字的平均值,并发射此平均值Concat — 发射来自两个或多个 Observable 的发射,而不交织它们Count — 统计源 Observable 发射的项目数量,只发射此值Max — 确定并发射由 Observable 发射的最大值项目Min — 确定并发射由 Observable 发射的最小值项目Reduce — 依次将函数应用于由 Observable 发射的每个项目,并发射最终值Sum — 计算由 Observable 发射的数字的总和,并发射此总和具有更精确控制订阅动态的特殊 Observable
Connect — 指示可连接 Observable 开始向其订阅者发射项目Publish — 将普通 Observable 转换为可连接 ObservableRefCount — 使可连接 Observable 类似于普通 ObservableReplay — 确保所有观察者看到相同的发射项目序列,即使他们在 Observable 开始发射项目后才订阅To — 将 Observable 转换为其他对象或数据结构这棵树可以帮助您找到您正在寻找的 ReactiveX Observable 运算符。
Future,它在 Observable 完成之前阻塞规范的、核心运算符名称以粗体显示。其他条目表示这些运算符的特定于语言的变体或 ReactiveX 核心运算符集之外的特殊运算符。
聚合全部模棱两可ambArrayambWithand_并且任何应用as_blockingasObservable断言相等异步操作异步函数平均平均双精度平均浮点数平均整数平均长整数阻塞阻塞第一个阻塞遍历阻塞可迭代阻塞最后阻塞最新阻塞最新阻塞下一个阻塞单一阻塞订阅缓冲区带计数的缓冲区带时间的缓冲区带时间或计数的缓冲区逐行缓存带初始容量的缓存情况转换捕获捕获错误捕获异常收集collect (RxScala 版本的 Filter)收集到组合最新组合最新延迟错误组合最新连接连接所有连接所有连接数组连接数组延迟错误连接数组急切连接延迟错误连接急切连接映射连接映射延迟错误连接映射急切连接映射急切延迟错误连接映射可迭代连接映射观察者连接映射到连接连接连接永远cons包含受控计数计数长整数创建循环去抖动解码如果为空则使用默认值推迟推迟 Future延迟延迟订阅带选择器延迟非具体化不同不同键直到更改时不同直到键更改时不同做执行操作完成后的操作完成时的操作完成时的操作doOnDisposedoOnEachdoOnErrordoOnLifecycledoOnNextdoOnRequestdoOnSubscribedoOnTerminatedoOnUnsubscribedoseqdoWhiledropdropRightdropUntildropWhile元素AtElementAtOrDefault空emptyObservableempty?encodeensureserroreveryexclusiveexistsexpandfailWith过滤filterNotFinallyfinallyActionfinallyDofindfindIndex第一个firstElementFirstOrDefaultfirstOrElse扁平映射flatMapFirstflatMapIterableflatMapIterableWithflatMapLatestflatMapObserverflatMapWithflatMapWithMaxConcurrentflat_map_with_indexflattenflattenDelayErrorfoldlfoldLeftforforallForEachforEachFutureforEachWhileforInforkJoin来自fromActionfromArrayFromAsyncPatternfromCallablefromCallbackFromEventFromEventPatternfromFunc0fromFuturefromIterablefromIteratorfrom_listfromNodeCallbackfromPromisefromPublisherfromRunnableGenerategenerateWithAbsoluteTimegenerateWithRelativeTimegeneratorGetEnumeratorgetIterator分组GroupByUntilGroupJoinheadheadOptionheadOrElseififThen忽略元素indexOfinterleaveinterpose间隔intervalRangeintoisEmptyitems加入join (字符串)jortSortjortSortUntil只keepkeep-indexed最后lastElementlastOptionLastOrDefaultlastOrElseLatestlatest (Rx.rb 版本的 Switch)lengthletletBindliftlimitLongCountManySelect映射map (RxClojure 版本的 Zip)MapCatmapCat (RxClojure 版本的 Zip)map-indexedmapTomapWithIndex具体化最大MaxBy合并mergeAllmergeArraymergeArrayDelayErrormerge_concurrentmergeDelayErrormergeObservablemergeWith最小MinByMostRecentMulticastmulticastWithSelectornest永不NextNext (BlockingObservable 版本)nonenonEmptynth观察OnObserveOnDispatcherobserveSingleOnofof_arrayofArrayChangesof_enumerableof_enumeratorofObjectChangesOfTypeofWithScheduleronBackpressureBlockonBackpressureBufferonBackpressureDropOnErrorResumeNextonErrorReturnonErrorReturnItemonExceptionResumeNextonTerminateDetachorElsepairspairwisepartitionpartition-allpausablepausableBufferedpluckproduct发布发布最后publish_synchronizedpublishValueraise_error范围ReducereduceWithreductions引用计数重复repeat_infinitelyrepeatUntilrepeatWhen重播rescue_errorrest重试retry_infinitelyretryUntilretryWhenReturnreturnElementreturnValuerunAsyncsafeSubscribe采样扫描scanWithscopeSelect (Map 的别名)select (Filter 的别名)selectConcatselectConcatObserverSelectManyselectManyObserverselect_switchselectSwitchselectSwitchFirstselectWithMaxConcurrentselect_with_indexseq序列相等sequence_eql?SequenceEqualWithSerializeshareshareReplayshareValueSinglesingleElementSingleOrDefaultsingleOptionsingleOrElsesize跳过跳过最后skipLastWithTime跳过直到skipUntilWithTime跳过直到skipWhileWithIndexskip_with_timesliceslidingslidingBuffersomesortsortedsort-bysorted-list-bysplitsplit-with开始startAsyncstartFuture以...开头startWithArraystringConcatstopAndWaitsubscribesubscribeActual订阅OnSubscribeOnDispatchersubscribeOnCompletedsubscribeOnErrorsubscribeOnNextsubscribeWith总和sumDoublesumFloatsumIntegersumLong切换switchCaseswitchIfEmptyswitchLatestswitchMapswitchMapDelayErrorswitchOnNextswitchOnNextDelayErrorSynchronize获取take_with_timetakeFirstTakeLasttakeLastBuffertakeLastBufferWithTimetakeLastWithTimetakeRight (另见:TakeLast)获取直到takeUntilWithTime获取直到takeWhileWithIndextailtaptapOnCompletedtapOnErrortapOnNextThenthenDoThrottlethrottleFirstthrottleLastthrottleLatestthrottleWithSelectorthrottleWithTimeout抛出throwErrorthrowException时间间隔超时timeoutWithSelector计时器时间戳到to_aToArrayToAsynctoBlockingtoBufferto_dictToDictionaryToEnumerableToEventToEventPatternToFlowableToFutureto_htoIndexedSeqtoIterabletoIteratorToListToLookuptoMaptoMultiMapToObservabletoSettoSortedListtoStreamToTasktoTraversabletoVectortumblingtumblingBufferunsafeCreateunsubscribeOn使用WhenWherewhilewhileDo窗口windowWithCountwindowWithTimewindowWithTimeOrCountwindowedwithFilterwithLatestFrom拉链zipArrayzipIterablezipWithzipWithIndex+++::+