简介

ReactiveX 的每个语言特定实现都实现了一组运算符。尽管实现之间存在很多重叠,但也有一些运算符只在某些实现中实现。此外,每个实现都倾向于将其运算符命名为类似于在该语言的其他上下文中已经熟悉的类似方法。

链式运算符

大多数运算符对 Observable 进行操作并返回 Observable。这使您可以将这些运算符一个接一个地应用,形成一个链。链中的每个运算符都会修改前一个运算符操作产生的 Observable。

还存在其他模式,例如 Builder 模式,其中特定类的各种方法对该类的同一个项目进行操作,通过方法的操作修改该对象。这些模式也允许您以类似的方式对方法进行链式操作。但是,虽然在 Builder 模式中,方法在链中出现的顺序通常无关紧要,但对于 Observable 运算符来说,顺序很重要。

Observable 运算符链不会独立地对产生链的原始 Observable 进行操作,而是依次进行操作,每个运算符都会对链中紧邻其前面的运算符生成的 Observable 进行操作。

ReactiveX 的运算符

此页面首先列出了 ReactiveX 中可能被认为是“核心”的运算符,并链接到提供有关这些运算符工作原理以及特定语言特定 ReactiveX 版本如何实现这些运算符的更深入信息的页面。

接下来是一个“决策树”,它可以帮助您选择最适合您的用例的运算符。

最后,是 ReactiveX 的许多语言特定实现中提供的大多数运算符的字母顺序列表。这些链接到记录最接近语言特定运算符的核心运算符的页面(例如,Rx.NET 的“SelectMany”运算符链接到 FlatMap ReactiveX 运算符的文档,“SelectMany”是 Rx.NET 的实现)。

如果要实现自己的运算符,请参阅 实现自己的运算符.

内容

  1. 按类别分类的运算符
  2. Observable 运算符决策树
  3. Observable 运算符字母顺序列表

按类别分类的运算符

创建 Observable

生成新的 Observable 的运算符。

  • Create — 通过以编程方式调用观察者方法从头开始创建 Observable
  • Defer — 直到观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 Observable
  • Empty/Never/Throw — 创建具有非常精确和有限行为的 Observable
  • From — 将其他对象或数据结构转换为 Observable
  • Interval — 创建一个 Observable,它按特定时间间隔发出一系列整数
  • Just — 将对象或一组对象转换为发出该对象或这些对象的 Observable
  • Range — 创建发出连续整数范围的 Observable
  • Repeat — 创建一个 Observable,它重复发出特定项目或一系列项目
  • Start — 创建一个 Observable,它发出函数的返回值
  • Timer — 创建一个 Observable,它在给定延迟后发出单个项目

转换 Observable

转换由 Observable 发出的项目的运算符。

  • Buffer — 定期将 Observable 中的项目收集到捆绑包中,并发出这些捆绑包,而不是一次发出一个项目
  • FlatMap — 将由 Observable 发出的项目转换为 Observable,然后将这些 Observable 的发出项展平成单个 Observable
  • GroupBy — 将 Observable 分成一组 Observable,每个 Observable 都发出原始 Observable 中的不同组项目,这些项目按键进行组织
  • Map — 通过对每个项目应用函数来转换由 Observable 发出的项目
  • Scan — 对由 Observable 发出的每个项目依次应用函数,并发出每个后续值
  • Window — 定期将 Observable 中的项目细分为 Observable 窗口,并发出这些窗口,而不是一次发出一个项目

过滤 Observable

选择性地从源 Observable 发出项目的运算符。

  • Debounce — 仅当经过特定时间段后 Observable 没有发出其他项目时,才从 Observable 发出项目
  • Distinct — 抑制由 Observable 发出的重复项目
  • ElementAt — 仅发出由 Observable 发出的第 n 个项目
  • Filter — 仅发出通过谓词测试的 Observable 中的项目
  • First — 仅发出第一个项目,或第一个满足条件的项目,从 Observable 中发出
  • IgnoreElements — 不从 Observable 发出任何项目,但镜像其终止通知
  • Last — 仅发出由 Observable 发出的最后一个项目
  • Sample — 每隔特定时间间隔发出 Observable 发出的最新项目
  • Skip — 抑制由 Observable 发出的前 n 个项目
  • SkipLast — 抑制由 Observable 发出的最后 n 个项目
  • Take — 仅发出由 Observable 发出的前 n 个项目
  • TakeLast — 仅发出由 Observable 发出的最后 n 个项目

组合 Observable

使用多个源 Observable 来创建单个 Observable 的运算符

  • And/Then/When — 通过 PatternPlan 中介,组合两个或多个 Observable 发出的项目集
  • CombineLatest — 当两个 Observable 中的任何一个发出项目时,通过指定函数组合每个 Observable 发出的最新项目,并根据该函数的结果发出项目
  • Join — 每当一个 Observable 在根据另一个 Observable 发出的项目定义的时间窗口内发出项目时,组合两个 Observable 发出的项目
  • Merge — 通过合并多个 Observable 的发出项将多个 Observable 组合成一个
  • StartWith — 在开始发出源 Observable 中的项目之前,发出指定的项目序列
  • Switch — 将发出 Observable 的 Observable 转换为单个 Observable,该 Observable 发出最近发出的那些 Observable 发出的项目
  • Zip — 通过指定函数将多个 Observable 的发出项组合在一起,并根据该函数的结果为每个组合发出单个项目

错误处理运算符

帮助从 Observable 的错误通知中恢复的运算符

  • Catch — 通过在没有错误的情况下继续序列来从 onError 通知中恢复
  • Retry — 如果源 Observable 发送 onError 通知,则重新订阅它,以期它将在没有错误的情况下完成

Observable 实用运算符

用于处理 Observable 的实用运算符工具箱

  • Delay — 将 Observable 的发出项在时间上向前移动特定量
  • Do — 注册一个操作,以便在 Observable 的各种生命周期事件发生时执行
  • Materialize/Dematerialize — 将发出的项目和发送的通知都表示为发出的项目,或反转此过程
  • ObserveOn — 指定观察者将观察此 Observable 的调度器
  • Serialize — 强制 Observable 进行序列化调用,并确保其行为良好
  • Subscribe — 对 Observable 的发出项和通知进行操作
  • SubscribeOn — 指定 Observable 在订阅时应使用的调度器
  • TimeInterval — 将发出项目的 Observable 转换为发出这些项目之间经过时间量的指示的 Observable
  • Timeout — 镜像源 Observable,但在经过特定时间段后没有任何发出项的情况下,发出错误通知
  • Timestamp — 将时间戳附加到由 Observable 发出的每个项目
  • Using — 创建一个与 Observable 具有相同生命周期的可处置资源

条件和布尔运算符

评估一个或多个 Observable 或由 Observable 发出的项目的运算符

  • All — 确定由 Observable 发出的所有项目是否满足某些条件
  • Amb — 给定两个或多个源 Observable,仅从最先发出项目的 Observable 中发出所有项目
  • Contains — 确定 Observable 是否发出特定项目
  • DefaultIfEmpty — 发出源 Observable 中的项目,或者如果源 Observable 没有发出任何项目,则发出默认项目
  • SequenceEqual — 确定两个 Observable 是否发出相同的项目序列
  • SkipUntil — 在第二个 Observable 发出项目之前,丢弃由 Observable 发出的项目
  • SkipWhile — 在指定条件变为假之前,丢弃由 Observable 发出的项目
  • TakeUntil — 在第二个 Observable 发射项目或终止后,丢弃由 Observable 发射的项目
  • TakeWhile — 在指定条件变为 false 后,丢弃由 Observable 发射的项目

数学和聚合运算符

对 Observable 发射的整个项目序列进行操作的运算符

  • Average — 计算由 Observable 发射的数字的平均值,并发射此平均值
  • Concat — 发射来自两个或多个 Observable 的发射,而不交织它们
  • Count — 统计源 Observable 发射的项目数量,只发射此值
  • Max — 确定并发射由 Observable 发射的最大值项目
  • Min — 确定并发射由 Observable 发射的最小值项目
  • Reduce — 依次将函数应用于由 Observable 发射的每个项目,并发射最终值
  • Sum — 计算由 Observable 发射的数字的总和,并发射此总和

背压运算符

  • 背压运算符 — 处理 Observable 比其观察者消耗得快的情况的策略

可连接 Observable 运算符

具有更精确控制订阅动态的特殊 Observable

  • Connect — 指示可连接 Observable 开始向其订阅者发射项目
  • Publish — 将普通 Observable 转换为可连接 Observable
  • RefCount — 使可连接 Observable 类似于普通 Observable
  • Replay — 确保所有观察者看到相同的发射项目序列,即使他们在 Observable 开始发射项目后才订阅

用于转换 Observable 的运算符

  • To — 将 Observable 转换为其他对象或数据结构

Observable 运算符决策树

这棵树可以帮助您找到您正在寻找的 ReactiveX Observable 运算符。

我想创建一个新的 Observable
发射一个特定的项目
在订阅时由函数返回的
开始
在订阅时由 ActionCallableRunnable 或类似的东西返回的
来自
经过指定的延迟
计时器
从特定的 ArrayIterable 或类似的东西中提取其发射
来自
通过从 Future 中检索它
开始
从 Future 中获取其序列
来自
重复发射项目序列
重复
从头开始,使用自定义逻辑
创建
对于每个订阅的观察者
推迟
发射一系列整数
范围
在特定的时间间隔
间隔
经过指定的延迟
计时器
在不发射项目的情况下完成
什么也不做
永不
我想通过组合其他 Observable 来创建一个 Observable
并以接收到的任何顺序发射来自所有 Observable 的所有项目
合并
并以一次一个 Observable 的方式发射来自所有 Observable 的所有项目
连接
通过按顺序组合来自两个或多个 Observable 的项目来创建要发射的新项目
每当每个 Observable 都发射了新项目时
拉链
每当任何 Observable 都发射了新项目时
组合最新
每当由另一个 Observable 发射的项目定义的窗口中发射由一个 Observable 的项目时
加入
通过 PatternPlan 中介
并且/然后/当
并仅发射来自那些 Observable 中最近发射的项目的项目
切换
我想在转换 Observable 发射的项目后发射它们
一次一个使用函数
映射
通过发射来自对应 Observable 的所有项目
扁平映射
一次一个 Observable,按照它们发射的顺序
连接映射
基于所有先前的项目
扫描
通过向它们附加时间戳
时间戳
成为发射项目之前经过的时间的指示
时间间隔
我想在重新发射 Observable 发射的项目之前,将它们在时间上向前移动
延迟
我想将来自 Observable 的项目通知转换为项目并重新发射它们
通过将它们包装在 Notification 对象中
具体化
然后我可以再次解开它们
非具体化
我想忽略 Observable 发射的所有项目,只传递它的完成/错误通知
忽略元素
我想镜像一个 Observable,但在其序列中添加前缀
以...开头
只有在它的序列为空的情况下
如果为空则使用默认值
我想收集来自 Observable 的项目,并将它们重新发射为项目缓冲区
缓冲区
只包含最后发射的项目
获取最后缓冲区
我想将一个 Observable 拆分为多个 Observable
窗口
以便类似的项目最终出现在同一个 Observable 上
分组
我想检索由 Observable 发射的特定项目
在它完成之前发射的最后一个项目
最后
它发射的唯一项目
Single
它发射的第一个项目
第一个
我只想重新发射来自 Observable 的某些项目
通过过滤掉不符合某些谓词的项目
过滤
即只有第一个项目
第一个
即只有第一个项目
获取
即只有最后一个项目
最后
即只有第 n 个项目
元素At
即只有第一个项目之后的那些项目
即第一个 n 个项目之后
跳过
即直到其中一个项目匹配谓词
跳过直到
即经过一段时间之后
跳过
即在第二个 Observable 发射项目之后
跳过直到
即除了最后一个项目之外的那些项目
即除了最后 n 个项目之外
跳过最后
即直到其中一个项目匹配谓词
获取直到
即除了源完成之前一段时间内发射的项目之外
跳过最后
即除了第二个 Observable 发射项目后发射的项目之外
获取直到
通过定期对 Observable 进行采样
采样
通过只发射那些在某些持续时间内没有被其他项目跟随的项目
去抖动
通过抑制已经发射的项目的重复项
不同
如果它们紧随其重复的项目
直到更改时不同
通过在它开始发射项目后延迟一段时间订阅它
延迟订阅
我只想重新发射来自 Observable 的项目,条件是它是发射项目的第一个 Observable 集合
模棱两可
我想评估 Observable 发射的整个项目序列
并发射一个布尔值,指示所有项目是否通过了某些测试
全部
并发射一个布尔值,指示 Observable 是否发射了任何项目(通过了某些测试)
包含
并发射一个布尔值,指示 Observable 是否没有发射任何项目
为空
并发射一个布尔值,指示序列是否与第二个 Observable 发射的序列相同
序列相等
并发射所有值的平均值
平均
并发射所有值的总和
总和
并发射一个数字,指示序列中有多少个项目
计数
并发射具有最大值的项目
最大
并发射具有最小值的项目
最小
通过依次将聚合函数应用于每个项目,并发射结果
扫描
我想将 Observable 发射的整个项目序列转换为其他数据结构
我想要一个运算符在特定的 调度器 上运行
订阅On
当它通知观察者时
观察On
我想要一个 Observable 在某些事件发生时调用特定操作
我想要一个 Observable 通知观察者错误
抛出
如果在它不发射项目的情况下经过了指定的时间段
超时
我想让 Observable 从容地恢复
通过切换到备用 Observable 从超时中恢复
超时
从上游错误通知中恢复
捕获
通过尝试重新订阅上游 Observable
重试
我想创建一个与 Observable 具有相同生命周期的资源
使用
我想订阅 Observable 并接收一个 Future,它在 Observable 完成之前阻塞
开始
我想要一个 Observable,它在被要求之前不会开始向订阅者发射项目
发布
然后只发射其序列中的最后一个项目
发布最后
然后发射完整的序列,即使那些在序列开始后订阅的序列
重播
但我希望它在所有订阅者取消订阅后消失
引用计数
然后我想要求它开始
连接

另请参阅

Observable 运算符字母顺序列表

规范的、核心运算符名称以粗体显示。其他条目表示这些运算符的特定于语言的变体或 ReactiveX 核心运算符集之外的特殊运算符。