在响应式扩展中,很容易出现 Observable 比操作符或观察者更快地发出项的情况。这会导致一个问题:如何处理不断增长的未消费项的积压。
例如,想象一下使用 Zip 操作符将两个无限 Observable 压缩在一起,其中一个 Observable 的发出频率是另一个 Observable 的两倍。该操作符的简单实现将不得不维护一个不断扩展的缓冲区,用于保存更快 Observable 发出的项,以最终与较慢 Observable 发出的项结合在一起。这会导致响应式扩展占用大量的系统资源。
在响应式扩展中,您可以采用多种策略来控制流和背压,以缓解快速生成 Observable 遇到缓慢消费观察者的问题,这些策略包括,在某些响应式扩展实现中,响应式拉式背压和一些特定于背压的操作符。
一个 冷 Observable 发出一个特定的项序列,但可以在其观察者发现方便时开始发出此序列,并且可以在观察者所需的任何速率下发出序列,而不会破坏序列的完整性。例如,如果您将一个静态可迭代对象转换为一个 Observable,那么无论何时订阅它或以何种频率观察这些项,该 Observable 都会发出相同的项序列。冷 Observable 发出的项示例可能包括数据库查询、文件检索或网络请求的结果。
一个 热 Observable 在创建时立即开始生成要发出的项。订阅者通常从热 Observable 发出的项序列的中间某个位置开始观察该序列,从 Observable 在建立订阅后发出的第一个项开始。这样的 Observable 以其自己的速度发出项,由其观察者决定是否跟上。热 Observable 发出的项示例可能包括鼠标和键盘事件、系统事件或股票价格。
当一个冷 Observable 被多播(当它被转换为可连接 Observable 并且其 Connect 方法被调用)时,它实际上就变成了热 Observable,就背压和流控制而言,应该将其视为热 Observable。
冷 Observable 非常适合某些响应式扩展实现(在其他地方有描述)实现的响应式拉式背压模型。热 Observable 通常不善于处理响应式拉式模型,而是更适合其他流控制策略,例如使用本页描述的操作符,或像 Buffer、Sample、Debounce 或 Window 这样的操作符。
待定
RxGroovy 实施响应式拉式背压,并且其许多操作符都支持这种形式的背压。它还具有三个可以应用于未编写为支持背压的 Observable 的操作符
RxJava 实施响应式拉式背压,并且其许多操作符都支持这种形式的背压。它还具有三个可以应用于未编写为支持背压的 Observable 的操作符
RxJS 通过使用 controlled
操作符将普通 Observable 转换为 ControlledObservable
来实现背压。这强制 Observable 遵守其观察者的拉式 request
,而不是主动推送项。
还有两个操作符可以将普通 Observable 转换为 PausableObservable
。
待定
待定
待定
待定