可观察对象

在 ReactiveX 中,一个观察者 订阅一个可观察对象。然后该观察者会对可观察对象发出的任何项目或项目序列做出反应。这种模式有利于并发操作,因为它不需要在等待可观察对象发出对象时阻塞,而是创建了一个观察者形式的哨兵,该哨兵随时准备在可观察对象将来发出对象时以适当的方式做出反应。

此页面解释了什么是反应式模式以及可观察对象和观察者是什么(以及观察者如何订阅可观察对象)。其他页面展示了如何使用各种可观察对象操作符来链接可观察对象并改变它们的行为。

此文档在其解释中使用了“弹珠图”。以下是弹珠图如何表示可观察对象和可观察对象的转换

另请参阅

背景

在许多软件编程任务中,您或多或少地期望您编写的指令将按顺序、一次一个地按您编写的顺序执行和完成。但在 ReactiveX 中,许多指令可能并行执行,它们的结果稍后会以任意顺序被“观察者”捕获。与其调用一个方法,您定义一个用于检索和转换数据的机制(以“可观察对象”的形式),然后订阅一个观察者到它,此时之前定义的机制就会生效,观察者会随时待命以捕获并响应它的发射,无论它们何时准备就绪。

这种方法的一个优点是,当您有一堆彼此不依赖的任务时,您可以同时启动所有任务,而不是等待每个任务完成后再启动下一个任务——这样,您的整个任务包只需要完成最长任务的时间。

有许多术语用于描述这种异步编程和设计模型。本文档将使用以下术语:一个观察者 订阅一个可观察对象。可观察对象发出项目或向其观察者发送通知,方法是调用观察者的方法。

在其他文档和其他上下文中,我们称之为“观察者”的东西有时被称为“订阅者”、“观察者”或“反应器”。这种模型通常被称为“反应器模式”.

建立观察者

此页面在其示例中使用类似 Groovy 的伪代码,但 ReactiveX 在许多语言中都有实现。

在普通的函数调用中(即不是 ReactiveX 中常见的异步、并行调用)——流程是这样的

  1. 调用一个方法。
  2. 将该方法的返回值存储在一个变量中。
  3. 使用该变量及其新值来做一些有用的事情。

或者,是这样的

// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

在异步模型中,流程更像是这样

  1. 定义一个方法,该方法使用异步调用的返回值执行一些有用的操作;该方法是观察者的一部分。
  2. 将异步调用本身定义为一个可观察对象
  3. 通过订阅观察者将其附加到该可观察对象(这也启动了可观察对象的动作)。
  4. 继续您的工作;无论何时调用返回,观察者的方法都将开始对其返回值或值(可观察对象发出的项目)进行操作。

这看起来像这样

// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business

onNext、onCompleted 和 onError

Subscribe 方法是将观察者连接到可观察对象的方式。您的观察者实现以下方法的一些子集

onNext
每当可观察对象发出项目时,可观察对象都会调用此方法。此方法将可观察对象发出的项目作为参数。
onError
可观察对象调用此方法以指示它无法生成预期数据或遇到了其他错误。它不会再调用onNextonCompletedonError方法将导致错误的原因作为其参数。
onCompleted
可观察对象在最后一次调用onNext后调用此方法,前提是它没有遇到任何错误。

根据可观察对象契约的条款,它可能会调用onNext零次或多次,然后可能会在这些调用之后调用onCompletedonError,但不会同时调用两者,这将是它的最后一次调用。按照惯例,在本文件中,调用onNext通常称为“发射”项目,而调用onCompletedonError则称为“通知”。

一个更完整的subscribe调用示例如下

def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business

另请参阅

取消订阅

在某些 ReactiveX 实现中,存在一个专门的观察者接口Subscriber,它实现了unsubscribe方法。您可以调用此方法来指示订阅者不再对当前订阅的任何可观察对象感兴趣。然后,这些可观察对象可以选择停止生成要发出的新项目(如果它们没有其他感兴趣的观察者)。

取消订阅的结果将级联回应用于观察者订阅的可观察对象的运算符链中,这将导致链中的每个链接停止发出项目。但是,这并不保证立即发生,并且可观察对象有可能在没有观察者观察这些发射的情况下,即使在没有任何观察者的情况下,也能生成并尝试发出项目一段时间。

关于命名约定的几点说明

ReactiveX 的每种特定于语言的实现都有自己的命名怪癖。没有规范的命名标准,尽管实现之间有很多共同点。

此外,这些名称中的一些在其他上下文中具有不同的含义,或者在特定实现语言的习惯用法中显得笨拙。

例如,存在onEvent命名模式(例如onNextonCompletedonError)。在某些情况下,这些名称将指示事件处理程序注册的方法。然而,在 ReactiveX 中,它们命名事件处理程序本身。

“热”和“冷”可观察对象

可观察对象何时开始发出其项目序列?这取决于可观察对象。“热”可观察对象可能会在创建时就开始发出项目,因此任何后来订阅该可观察对象的观察者都可能在序列的中间开始观察。另一方面,“冷”可观察对象会等到观察者订阅它才会开始发出项目,因此这样的观察者保证可以从头开始看到整个序列。

在 ReactiveX 的某些实现中,还存在一种称为“可连接”可观察对象的东西。这样的可观察对象不会在调用其Connect方法之前开始发出项目,无论是否已经有观察者订阅它。

通过可观察对象操作符进行组合

可观察对象和观察者只是 ReactiveX 的开始。它们本身仅仅是标准观察者模式的微不足道的扩展,更适合处理事件序列而不是单个回调。

真正的力量来自“反应式扩展”(因此得名“ReactiveX”)——操作符,允许您转换、组合、操作和处理可观察对象发出的项目序列。

这些 Rx 操作符允许您以声明式的方式将异步序列组合在一起,并具有回调的所有效率优势,但没有与异步系统通常相关的嵌套回调处理程序的缺点。

此文档将有关各种操作符及其用法示例的信息分组到以下页面中

创建可观察对象
CreateDeferEmpty/Never/ThrowFromIntervalJustRangeRepeatStartTimer
转换可观察对象项目
BufferFlatMapGroupByMapScanWindow
过滤可观察对象
DebounceDistinctElementAtFilterFirstIgnoreElementsLastSampleSkipSkipLastTakeTakeLast
组合可观察对象
And/Then/WhenCombineLatestJoinMergeStartWithSwitchZip
错误处理操作符
CatchRetry
实用操作符
DelayDoMaterialize/DematerializeObserveOnSerializeSubscribeSubscribeOnTimeIntervalTimeoutTimestampUsing
条件和布尔操作符
AllAmbContainsDefaultIfEmptySequenceEqualSkipUntilSkipWhileTakeUntilTakeWhile
数学和聚合操作符
AverageConcatCountMaxMinReduceSum
转换可观察对象
To
可连接可观察对象操作符
ConnectPublishRefCountReplay
背压操作符
各种强制执行特定流控制策略的操作符

这些页面包含有关某些操作符的信息,这些操作符不是 ReactiveX 核心的一部分,但已在一种或多种特定于语言的实现和/或可选模块中实现。

链接操作符

大多数操作符对可观察对象进行操作并返回可观察对象。这允许您将这些操作符一个接一个地应用于链中。链中的每个操作符都会修改由前一个操作符的操作生成的可观察对象。

还有其他模式,例如构建器模式,其中特定类的多种方法对该类中的相同项目的项目进行操作,方法是通过方法的操作修改该对象。这些模式也允许您以类似的方式链接方法。但是,虽然在构建器模式中,方法在链中出现的顺序通常无关紧要,但对于可观察对象操作符,顺序很重要

可观察对象操作符链不会独立地对产生链的原始可观察对象进行操作,而是依次操作,每个操作符都对链中前一个操作符生成的观察对象进行操作。

空值

在某些 ReactiveX 实现中,例如RxJava 2.xRxJava 3.x,由于现在与Reactive Streams 规范的强制兼容性,null值不再被允许。

一般来说,null 是模棱两可的,因为没有好的方法来区分一个不存在的指示符和一个错误,它返回 null 而不是抛出异常。如果确实需要一个不存在的指示符,可以考虑使用 Optional<T>(在 Java 8+ 中)或类似的结构来包装一个可能为 nullT,并让它在链上传播。

是的,使用这种方法会引入内存开销和间接性(以及一些不便)。但是,如果仔细考虑,以这种方式支持 null 会在所有地方引入开销(正如 RxJava 1.x 所做的那样),即使在大多数情况下,这都是不必要的。

因此,RxJava 2.xRxJava 3.x 会积极检查参数是否为 null,从用户提供的回调中返回 null 值,并且几乎不会使用 null 调用用户提供的回调,除非在特定操作符的文档中另有说明。