在响应式扩展中,很容易出现 Observable 比操作符或观察者更快地发出项的情况。这会导致一个问题:如何处理不断增长的未消费项的积压。
例如,想象一下使用 Zip 操作符将两个无限 Observable 压缩在一起,其中一个 Observable 的发出频率是另一个 Observable 的两倍。该操作符的简单实现将不得不维护一个不断扩展的缓冲区,用于保存更快 Observable 发出的项,以最终与较慢 Observable 发出的项结合在一起。这会导致响应式扩展占用大量的系统资源。
在响应式扩展中,您可以采用多种策略来控制流和背压,以缓解快速生成 Observable 遇到缓慢消费观察者的问题,这些策略包括,在某些响应式扩展实现中,响应式拉式背压和一些特定于背压的操作符。
一个 冷 Observable 发出一个特定的项序列,但可以在其观察者发现方便时开始发出此序列,并且可以在观察者所需的任何速率下发出序列,而不会破坏序列的完整性。例如,如果您将一个静态可迭代对象转换为一个 Observable,那么无论何时订阅它或以何种频率观察这些项,该 Observable 都会发出相同的项序列。冷 Observable 发出的项示例可能包括数据库查询、文件检索或网络请求的结果。
一个 热 Observable 在创建时立即开始生成要发出的项。订阅者通常从热 Observable 发出的项序列的中间某个位置开始观察该序列,从 Observable 在建立订阅后发出的第一个项开始。这样的 Observable 以其自己的速度发出项,由其观察者决定是否跟上。热 Observable 发出的项示例可能包括鼠标和键盘事件、系统事件或股票价格。
当一个冷 Observable 被多播(当它被转换为可连接 Observable 并且其 Connect 方法被调用)时,它实际上就变成了热 Observable,就背压和流控制而言,应该将其视为热 Observable。
冷 Observable 非常适合某些响应式扩展实现(在其他地方有描述)实现的响应式拉式背压模型。热 Observable 通常不善于处理响应式拉式模型,而是更适合其他流控制策略,例如使用本页描述的操作符,或像 Buffer、Sample、Debounce 或 Window 这样的操作符。
待定
RxGroovy 实施响应式拉式背压,并且其许多操作符都支持这种形式的背压。它还具有三个可以应用于未编写为支持背压的 Observable 的操作符
onBackpressureBuffer
维护一个缓冲区,用于保存来自源 Observable 的所有未观察到的发射,并根据下游观察者生成的请求将它们发射给下游观察者。
RxGroovy 1.1 中引入的此操作符的一个版本允许您设置缓冲区的容量;应用此操作符会导致生成的 Observable 在缓冲区溢出时以错误终止。在同一个版本中引入的第二个版本允许您设置一个 Action
,onBackpressureBuffer
在缓冲区溢出时会调用该 Action
。
onBackpressureBuffer()
onBackpressureBuffer(long)
(RxGroovy 1.1)onBackpressureBuffer(long, Action0)
(RxGroovy 1.1)
onBackpressureDrop
丢弃来自源 Observable 的发射,除非下游观察者有一个挂起的请求,在这种情况下,它将发射足够的项以满足该请求。
在 1.1 版本中引入的此操作符的一个版本会通过您作为参数传递的 Action
通知您何时丢弃了某个项以及丢弃了哪个项。
onBackpressureDrop()
onBackpressureLatest
(RxJava 1.1 中的新增内容)会保留来自源 Observable 的最新发射的项,并在请求时立即将该项发射给其观察者。它会丢弃它在观察者请求之间观察到的任何其他项。
onBackpressureLatest()
RxJava 实施响应式拉式背压,并且其许多操作符都支持这种形式的背压。它还具有三个可以应用于未编写为支持背压的 Observable 的操作符
onBackpressureBuffer
维护一个缓冲区,用于保存来自源 Observable 的所有未观察到的发射,并根据下游观察者生成的请求将它们发射给下游观察者。
在 RxJava 1.1 中引入的此操作符的一个版本允许您设置缓冲区的容量;应用此操作符会导致生成的 Observable 在缓冲区溢出时以错误终止。在同一个版本中引入的第二个版本允许您设置一个 Action
,onBackpressureBuffer
在缓冲区溢出时会调用该 Action
。
onBackpressureBuffer()
onBackpressureBuffer(long)
(RxJava 1.1)onBackpressureBuffer(long, Action0)
(RxJava 1.1)
onBackpressureDrop
丢弃来自源 Observable 的发射,除非下游观察者有一个挂起的请求,在这种情况下,它将发射足够的项以满足该请求。
在 1.1 版本中引入的此操作符的一个版本会通过您作为参数传递的 Action
通知您何时丢弃了某个项以及丢弃了哪个项。
onBackpressureDrop()
onBackpressureLatest
(RxJava 1.1 中的新增内容)会保留来自源 Observable 的最新发射的项,并在请求时立即将该项发射给其观察者。它会丢弃它在观察者请求之间观察到的任何其他项。
onBackpressureLatest()
RxJS 通过使用 controlled
操作符将普通 Observable 转换为 ControlledObservable
来实现背压。这强制 Observable 遵守其观察者的拉式 request
,而不是主动推送项。
作为使用 request
从 ControlledObservable
中拉取项的替代方法,您可以将 stopAndWait
操作符应用于它。此操作符将在其观察者的 onNext
例程接收到最新项时,从 Observable 请求一个新项。
第二种可能性是使用 windowed(
n)
。这与 stopAndWait
类似,但有一个 n 项的内部缓冲区,这使得 ControlledObservable
可以偶尔在观察者之前运行。windowed(1)
等效于 stopAndWait
。
还有两个操作符可以将普通 Observable 转换为 PausableObservable
。
如果您调用使用 pausable
操作符创建的 PausableObservable
的 pause
方法,它将丢弃(忽略)由底层源 Observable 发出的任何项,直到您调用其 resume
方法,此时它将继续将发射的项传递给其观察者。
如果您调用使用 pausableBuffered
操作符创建的 PausableObservable
的 pause
方法,它将缓冲由底层源 Observable 发出的任何项,直到您调用其 resume
方法,此时它将发出这些缓冲的项,然后继续将任何其他发射的项传递给其观察者。
待定
待定
待定
待定