ReactiveX 的许多实现使用 “Scheduler
s” 来控制 Observable 在多线程环境中的线程之间转换。您可以通过 ObserveOn 运算符指示 Observable 将其通知发送到特定 Scheduler 上的观察者。
请注意,如果 ObserveOn 收到 onError
终止通知,它将立即转发该通知,并且不会等待缓慢的观察者首先接收它已知的任何尚未发出的项目。这可能意味着 onError
通知会跳过(并吞并)源 Observable 发出的项目,如上图所示。
SubscribeOn 运算符类似,但它指示 Observable 本身在指定的 Scheduler 上运行,以及在该 Scheduler 上通知其观察者。
默认情况下,Observable 以及您对其应用的运算符链将在调用其 Subscribe
方法的同一线程上执行其工作并通知其观察者。 SubscribeOn 运算符通过指定 Observable 应该在其上运行的另一个 Scheduler 来更改此行为。 ObserveOn 运算符指定 Observable 用于向其观察者发送通知的另一个 Scheduler。
如该图所示,SubscribeOn 运算符指定 Observable 将开始运行的线程,无论在运算符链中的哪个位置调用该运算符。另一方面,ObserveOn 影响 Observable 将在该运算符下方使用的线程。因此,您可以在 Observable 运算符链中的不同位置多次调用 ObserveOn 以更改某些运算符在其上运行的线程。
Scheduler
待定
要指定 Observable 应该在其上调用其观察者的 onNext
、onCompleted
和 onError
方法的 Scheduler,请使用 observeOn
运算符,并将相应的 Scheduler
传递给它。
observeOn(Scheduler)
要指定 Observable 应该在其上调用其观察者的 onNext
、onCompleted
和 onError
方法的 Scheduler,请使用 observeOn
运算符,并将相应的 Scheduler
传递给它。
observeOn(Scheduler)
要指定 Observable 应该在其上调用其观察者的 onNext
、onCompleted
和 onError
方法的 Scheduler,请使用 observeOn
运算符,并将相应的 Scheduler
传递给它。
/* Change from immediate scheduler to timeout */ var source = Rx.Observable.return(42, Rx.Scheduler.immediate) .observeOn(Rx.Scheduler.timeout); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
observeOn
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js