主题是一种在某些 ReactiveX 实现中可用的桥梁或代理,它既充当观察者又充当可观察对象。由于它是一个观察者,它可以订阅一个或多个可观察对象,并且由于它是一个可观察对象,它可以通过重新发射来传递它观察到的项目,并且它还可以发射新的项目。
由于主题订阅了可观察对象,它将触发该可观察对象开始发射项目(如果该可观察对象是“冷的”——也就是说,如果它在开始发射项目之前等待订阅)。这可能会使生成的主题成为原始“冷”可观察对象的“热”可观察对象变体。
在大多数 ReactiveX 实现中,特别是那些可以在多线程环境中运行的实现中,主题在其观察者方面不被认为是线程安全的。但是,可观察对象端,即 Subscribe()
始终是线程安全的。
这意味着从多个线程调用 OnNext
、OnError
或 OnCompleted
会导致未定义的状态。
因此,大多数 ReactiveX 实现提供了一个特殊的运算符,它也使观察者方面线程安全。查找 ToSerialized
运算符。
有四种类型的 主题
为特定用例而设计。并非所有这些都在所有实现中都可用,并且某些实现使用其他命名约定(例如,在 RxScala 中,这里称为“PublishSubject”的主题被称为“Subject”)。
AsyncSubject
仅在源可观察对象完成之后,才发射源可观察对象发射的最后一个值(并且仅发射最后一个值)。(如果源可观察对象未发射任何值,AsyncSubject
也会在不发射任何值的情况下完成。)
它还将把这个相同的最终值发射给任何后续的观察者。但是,如果源可观察对象以错误终止,AsyncSubject
不会发射任何项目,而只会将错误通知从源可观察对象传递下去。
当观察者订阅 BehaviorSubject
时,它将从发射源可观察对象最近发射的项目(或者如果尚未发射任何项目,则发射种子/默认值)开始,然后继续发射源可观察对象(s) 之后发射的任何其他项目。
但是,如果源可观察对象以错误终止,BehaviorSubject
不会将任何项目发射给后续的观察者,而只会将错误通知从源可观察对象传递下去。
PublishSubject
仅将订阅时间之后由源可观察对象(s) 发射的项目发射给观察者。
请注意,PublishSubject
可能会在创建时立即开始发射项目(除非您已采取措施阻止此行为),因此在创建主题和观察者订阅主题之间可能会丢失一个或多个项目。如果您需要保证从源可观察对象传递所有项目,您需要使用 Create
形成该可观察对象,这样您就可以手动重新引入“冷”可观察对象行为(检查所有观察者是否已订阅,然后才开始发射项目),或者改用 ReplaySubject
。
如果源可观察对象以错误终止,PublishSubject
不会将任何项目发射给后续的观察者,而只会将错误通知从源可观察对象传递下去。
ReplaySubject
将将源可观察对象(s) 发射的所有项目发射给任何观察者,无论观察者何时订阅。
还存在 ReplaySubject
版本,它会在重播缓冲区威胁到超过一定大小,或者在项目最初发射后的指定时间段过去时丢弃旧项目。
如果您将 ReplaySubject
用作观察者,请注意不要从多个线程调用它的 onNext
方法(或它的其他 on
方法),因为这会导致巧合的(非顺序)调用,这违反了 可观察对象契约,并会在生成的主题中创建一个关于哪个项目或通知应首先重播的歧义。
在某些 ReactiveX 风格和版本中,例如 RxJava 3.x,有两种额外的主题类型可用,它们可以满足一些额外的常见角色。
一个主题,它会将事件排队,直到一个观察者订阅它,将这些事件重播给它,直到观察者赶上来,然后切换到将事件实时地传递给这个观察者,直到这个 UnicastSubject 终止或观察者释放。
表示一个热单值类似的事件源和消费者,类似于主题。由于单值只能发射一个项目或错误,因此 SingleSubject 隐式地是一个重播类似的主题。
表示一个热 Maybe 类似的事件源和消费者,类似于主题。由于 Maybe 只能发射一个项目、一个错误或完成,因此 MaybeSubject 隐式地是一个重播类似的主题。
表示一个热 Completable 类似的事件源和消费者,类似于主题。由于 Completable 只能完成或保存错误,因此 CompletableSubject 隐式地是一个重播类似的主题。
RxJava 2.x 和 RxJava 3.x 将反压感知主题定义为处理器,它们的命名与上述其他主题非常相似。它们的运行方式基本相同,除了它们不会在订阅者无法请求更多项目时溢出订阅者。通常,这些主题不会在订阅者之间协调,如果无法跟上,可能会单独使它们失败。
一个特殊的处理器,MulticastProcessor
如上图所示,在反压方面会协调其订阅者。
SingleSubject
、MaybeSubject
和 CompletableSubject
主题类型没有处理器变体,因为它们不需要支持反压,并且始终最多只能保存一个元素。
处理器还实现了 Reactive Streams Processor 接口,因此它们与 Java 中的 Reactive Streams 生态系统兼容。
待定
待定
如果您有一个 主题
并且您想将它传递给其他代理而不暴露其 Subscriber
接口,您可以通过调用它的 asObservable
方法来掩盖它,该方法将返回主题作为纯 Observable
。
AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
如果您有一个 主题
并且您想将它传递给其他代理而不暴露其 Subscriber
接口,您可以通过调用它的 asObservable
方法来掩盖它,该方法将返回主题作为纯 Observable
。
AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
如果您有一个 主题
并且您想将它传递给其他代理而不暴露其 Observer
接口,您可以通过调用它的 hide
方法来掩盖它,该方法将返回主题作为纯 Observable
。
AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
UnicastSubject
CompletableSubject
MaybeSubject
SingleSubject
待定
待定
AsyncSubject
BehaviorSubject
ReplaySubject
待定
待定
待定