如果您希望将多线程引入您的 Observable 运算符级联中,您可以通过指示这些运算符(或特定 Observable)在特定的 调度器 上运行来实现。
一些 ReactiveX Observable 运算符具有接受调度器作为参数的变体。这些会指示运算符在特定调度器上完成部分或全部工作。
默认情况下,Observable 和您应用于它的运算符链将完成其工作,并在调用其 Subscribe
方法的相同线程上通知其观察者。 SubscribeOn 运算符通过指定 Observable 应该在其上运行的不同调度器来改变这种行为。 ObserveOn 运算符指定 Observable 将用于向其观察者发送通知的不同调度器。
如本图所示,SubscribeOn 运算符指定 Observable 将在其上开始运行的线程,无论该运算符在运算符链中的哪个位置被调用。另一方面,ObserveOn 会影响 Observable 在该运算符出现的下方使用的线程。出于这个原因,您可以在 Observable 运算符链中的不同位置多次调用 ObserveOn,以便更改某些运算符在其上运行的线程。
待定
待定
您可以从 Schedulers
类 中描述的工厂方法获取调度器。下表显示了通过 RxGroovy 中这些方法可用的调度器种类
调度器 | 用途 |
---|---|
Schedulers.computation( ) | 适用于计算工作,例如事件循环和回调处理;不要将此调度器用于 I/O(请改用 Schedulers.io( ) );默认情况下,线程数等于处理器数量 |
Schedulers.from(executor) | 使用指定的 Executor 作为调度器 |
Schedulers.immediate( ) | 调度工作立即在当前线程上开始 |
Schedulers.io( ) | 适用于 I/O 密集型工作,例如异步执行阻塞 I/O,此调度器由一个线程池支持,该线程池会根据需要增长;对于普通计算工作,请切换到 Schedulers.computation( ) ;默认情况下,Schedulers.io( ) 是一个 CachedThreadScheduler ,它类似于一个具有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个工作单元创建一个新线程 |
Schedulers.trampoline( ) | 将工作排队以在当前线程上开始,在任何已排队的工 |
RxGroovy 中的一些 Observable 运算符具有备选形式,允许您设置运算符将用于(至少部分)其操作的调度器。其他运算符不会在任何特定调度器上运行,或者在特定默认调度器上运行。具有特定默认调度器的运算符包括
TestScheduler
允许您对调度器的时钟行为进行精确的手动控制。这对于测试依赖于时间中动作精确安排的交互很有用。此调度器具有三种额外的方法
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
您可以从 Schedulers
类 中描述的工厂方法获取调度器。下表显示了通过 RxJava 中这些方法可用的调度器种类
调度器 | 用途 |
---|---|
Schedulers.computation( ) | 适用于计算工作,例如事件循环和回调处理;不要将此调度器用于 I/O(请改用 Schedulers.io( ) );默认情况下,线程数等于处理器数量 |
Schedulers.from(executor) | 使用指定的 Executor 作为调度器 |
Schedulers.immediate( ) | 调度工作立即在当前线程上开始 |
Schedulers.io( ) | 适用于 I/O 密集型工作,例如异步执行阻塞 I/O,此调度器由一个线程池支持,该线程池会根据需要增长;对于普通计算工作,请切换到 Schedulers.computation( ) ;默认情况下,Schedulers.io( ) 是一个 CachedThreadScheduler ,它类似于一个具有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个工作单元创建一个新线程 |
Schedulers.trampoline( ) | 将工作排队以在当前线程上开始,在任何已排队的工 |
RxJava 中的一些 Observable 运算符具有备选形式,允许您设置运算符将用于(至少部分)其操作的调度器。其他运算符不会在任何特定调度器上运行,或者在特定默认调度器上运行。具有特定默认调度器的运算符包括
除了将这些调度器传递给 RxJava Observable 运算符外,您还可以使用它们在订阅上调度您自己的工作。以下示例使用 schedule
方法 Scheduler.Worker
类 来调度 newThread
调度器上的工作
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); } }); // some time later... worker.unsubscribe();
要调度递归调用,您可以使用 schedule
,然后在 Worker 对象上使用 schedule(this)
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); // recurse until unsubscribed (schedule will do nothing if unsubscribed) worker.schedule(this); } }); // some time later... worker.unsubscribe();
Worker
类的对象实现了 Subscription
接口,及其 isUnsubscribed
和 unsubscribe
方法,因此您可以在取消订阅时停止工作,或者您可以在计划的任务中取消订阅
Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { status = yourWork(); if(QUIT == status) { worker.unsubscribe(); } } } });
Worker
也是一个 Subscription
,因此您可以(而且应该最终)调用其 unsubscribe
方法来表示它可以停止工作并释放资源
worker.unsubscribe();
您还可以使用 schedule
的一个版本,该版本会延迟在给定调度器上执行您的操作,直到经过特定时间跨度。以下示例安排在 someScheduler
上执行 someAction
,在该调度器的时钟显示经过 500 毫秒之后
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
另一个 Scheduler
方法 允许您安排在定期时间间隔执行操作。以下示例安排在 someScheduler
上执行 someAction
,在该调度器的时钟显示经过 500 毫秒之后,然后每隔 250 毫秒执行一次
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler
允许您对调度器的时钟行为进行精确的手动控制。这对于测试依赖于时间中动作精确安排的交互很有用。此调度器具有三种额外的方法
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
在 RxJS 中,您可以从 Rx.Scheduler
对象或作为独立实现的对象获取调度器。下表显示了在 RxJS 中可用的调度器种类。
调度器 | 用途 |
---|---|
Rx.Scheduler.currentThread | 尽快在当前线程上调度工作 |
Rx.HistoricalScheduler | 调度工作,就好像它在任意历史时间发生一样 |
Rx.Scheduler.immediate | 立即在当前线程上调度工作 |
Rx.TestScheduler | 用于单元测试;这允许您手动操作时间的推移 |
Rx.Scheduler.timeout | 通过定时回调调度工作 |
待定
待定
待定
待定
待定