背压操作符

应对 Observable 比其观察者更快地生成项的策略

在响应式扩展中,很容易出现 Observable 比操作符或观察者更快地发出项的情况。这会导致一个问题:如何处理不断增长的未消费项的积压。

例如,想象一下使用 Zip 操作符将两个无限 Observable 压缩在一起,其中一个 Observable 的发出频率是另一个 Observable 的两倍。该操作符的简单实现将不得不维护一个不断扩展的缓冲区,用于保存更快 Observable 发出的项,以最终与较慢 Observable 发出的项结合在一起。这会导致响应式扩展占用大量的系统资源。

在响应式扩展中,您可以采用多种策略来控制流和背压,以缓解快速生成 Observable 遇到缓慢消费观察者的问题,这些策略包括,在某些响应式扩展实现中,响应式拉式背压和一些特定于背压的操作符。

一个 冷 Observable 发出一个特定的项序列,但可以在其观察者发现方便时开始发出此序列,并且可以在观察者所需的任何速率下发出序列,而不会破坏序列的完整性。例如,如果您将一个静态可迭代对象转换为一个 Observable,那么无论何时订阅它或以何种频率观察这些项,该 Observable 都会发出相同的项序列。冷 Observable 发出的项示例可能包括数据库查询、文件检索或网络请求的结果。

一个 热 Observable 在创建时立即开始生成要发出的项。订阅者通常从热 Observable 发出的项序列的中间某个位置开始观察该序列,从 Observable 在建立订阅后发出的第一个项开始。这样的 Observable 以其自己的速度发出项,由其观察者决定是否跟上。热 Observable 发出的项示例可能包括鼠标和键盘事件、系统事件或股票价格。

当一个冷 Observable 被多播(当它被转换为可连接 Observable 并且其 Connect 方法被调用)时,它实际上就变成了热 Observable,就背压和流控制而言,应该将其视为热 Observable。

冷 Observable 非常适合某些响应式扩展实现(在其他地方有描述)实现的响应式拉式背压模型。热 Observable 通常不善于处理响应式拉式模型,而是更适合其他流控制策略,例如使用本页描述的操作符,或像 BufferSampleDebounceWindow 这样的操作符。

另见

特定于语言的信息

待定

RxGroovy 实施响应式拉式背压,并且其许多操作符都支持这种形式的背压。它还具有三个可以应用于未编写为支持背压的 Observable 的操作符

onBackpressureBuffer

onBackpressureBuffer 维护一个缓冲区,用于保存来自源 Observable 的所有未观察到的发射,并根据下游观察者生成的请求将它们发射给下游观察者。

RxGroovy 1.1 中引入的此操作符的一个版本允许您设置缓冲区的容量;应用此操作符会导致生成的 Observable 在缓冲区溢出时以错误终止。在同一个版本中引入的第二个版本允许您设置一个 ActiononBackpressureBuffer 在缓冲区溢出时会调用该 Action

onBackpressureDrop

onBackpressureDrop 丢弃来自源 Observable 的发射,除非下游观察者有一个挂起的请求,在这种情况下,它将发射足够的项以满足该请求。

在 1.1 版本中引入的此操作符的一个版本会通过您作为参数传递的 Action 通知您何时丢弃了某个项以及丢弃了哪个项。

onBackpressureLatest

onBackpressureLatest(RxJava 1.1 中的新增内容)会保留来自源 Observable 的最新发射的项,并在请求时立即将该项发射给其观察者。它会丢弃它在观察者请求之间观察到的任何其他项。

RxJava 实施响应式拉式背压,并且其许多操作符都支持这种形式的背压。它还具有三个可以应用于未编写为支持背压的 Observable 的操作符

onBackpressureBuffer

onBackpressureBuffer 维护一个缓冲区,用于保存来自源 Observable 的所有未观察到的发射,并根据下游观察者生成的请求将它们发射给下游观察者。

在 RxJava 1.1 中引入的此操作符的一个版本允许您设置缓冲区的容量;应用此操作符会导致生成的 Observable 在缓冲区溢出时以错误终止。在同一个版本中引入的第二个版本允许您设置一个 ActiononBackpressureBuffer 在缓冲区溢出时会调用该 Action

onBackpressureDrop

onBackpressureDrop 丢弃来自源 Observable 的发射,除非下游观察者有一个挂起的请求,在这种情况下,它将发射足够的项以满足该请求。

在 1.1 版本中引入的此操作符的一个版本会通过您作为参数传递的 Action 通知您何时丢弃了某个项以及丢弃了哪个项。

onBackpressureLatest

onBackpressureLatest(RxJava 1.1 中的新增内容)会保留来自源 Observable 的最新发射的项,并在请求时立即将该项发射给其观察者。它会丢弃它在观察者请求之间观察到的任何其他项。

RxJS 通过使用 controlled 操作符将普通 Observable 转换为 ControlledObservable 来实现背压。这强制 Observable 遵守其观察者的拉式 request,而不是主动推送项。

stopAndWait

作为使用 requestControlledObservable 中拉取项的替代方法,您可以将 stopAndWait 操作符应用于它。此操作符将在其观察者的 onNext 例程接收到最新项时,从 Observable 请求一个新项。

windowed

第二种可能性是使用 windowed(n)。这与 stopAndWait 类似,但有一个 n 项的内部缓冲区,这使得 ControlledObservable 可以偶尔在观察者之前运行。windowed(1) 等效于 stopAndWait

还有两个操作符可以将普通 Observable 转换为 PausableObservable

pausable

如果您调用使用 pausable 操作符创建的 PausableObservablepause 方法,它将丢弃(忽略)由底层源 Observable 发出的任何项,直到您调用其 resume 方法,此时它将继续将发射的项传递给其观察者。

另见

pausableBuffered

如果您调用使用 pausableBuffered 操作符创建的 PausableObservablepause 方法,它将缓冲由底层源 Observable 发出的任何项,直到您调用其 resume 方法,此时它将发出这些缓冲的项,然后继续将任何其他发射的项传递给其观察者。

另见

待定

待定

待定

待定