如果您希望将多线程引入您的 Observable 运算符级联中,您可以通过指示这些运算符(或特定 Observable)在特定的 调度器 上运行来实现。
一些 ReactiveX Observable 运算符具有接受调度器作为参数的变体。这些会指示运算符在特定调度器上完成部分或全部工作。
待定
待定
您可以从 Schedulers
类 中描述的工厂方法获取调度器。下表显示了通过 RxGroovy 中这些方法可用的调度器种类
调度器 | 用途 |
---|---|
Schedulers.computation( ) | 适用于计算工作,例如事件循环和回调处理;不要将此调度器用于 I/O(请改用 Schedulers.io( ) );默认情况下,线程数等于处理器数量 |
Schedulers.from(executor) | 使用指定的 Executor 作为调度器 |
Schedulers.immediate( ) | 调度工作立即在当前线程上开始 |
Schedulers.io( ) | 适用于 I/O 密集型工作,例如异步执行阻塞 I/O,此调度器由一个线程池支持,该线程池会根据需要增长;对于普通计算工作,请切换到 Schedulers.computation( ) ;默认情况下,Schedulers.io( ) 是一个 CachedThreadScheduler ,它类似于一个具有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个工作单元创建一个新线程 |
Schedulers.trampoline( ) | 将工作排队以在当前线程上开始,在任何已排队的工 |
RxGroovy 中的一些 Observable 运算符具有备选形式,允许您设置运算符将用于(至少部分)其操作的调度器。其他运算符不会在任何特定调度器上运行,或者在特定默认调度器上运行。具有特定默认调度器的运算符包括
TestScheduler
允许您对调度器的时钟行为进行精确的手动控制。这对于测试依赖于时间中动作精确安排的交互很有用。此调度器具有三种额外的方法
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
您可以从 Schedulers
类 中描述的工厂方法获取调度器。下表显示了通过 RxJava 中这些方法可用的调度器种类
调度器 | 用途 |
---|---|
Schedulers.computation( ) | 适用于计算工作,例如事件循环和回调处理;不要将此调度器用于 I/O(请改用 Schedulers.io( ) );默认情况下,线程数等于处理器数量 |
Schedulers.from(executor) | 使用指定的 Executor 作为调度器 |
Schedulers.immediate( ) | 调度工作立即在当前线程上开始 |
Schedulers.io( ) | 适用于 I/O 密集型工作,例如异步执行阻塞 I/O,此调度器由一个线程池支持,该线程池会根据需要增长;对于普通计算工作,请切换到 Schedulers.computation( ) ;默认情况下,Schedulers.io( ) 是一个 CachedThreadScheduler ,它类似于一个具有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个工作单元创建一个新线程 |
Schedulers.trampoline( ) | 将工作排队以在当前线程上开始,在任何已排队的工 |
RxJava 中的一些 Observable 运算符具有备选形式,允许您设置运算符将用于(至少部分)其操作的调度器。其他运算符不会在任何特定调度器上运行,或者在特定默认调度器上运行。具有特定默认调度器的运算符包括
除了将这些调度器传递给 RxJava Observable 运算符外,您还可以使用它们在订阅上调度您自己的工作。以下示例使用 schedule
方法 Scheduler.Worker
类 来调度 newThread
调度器上的工作
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); } }); // some time later... worker.unsubscribe();
要调度递归调用,您可以使用 schedule
,然后在 Worker 对象上使用 schedule(this)
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); // recurse until unsubscribed (schedule will do nothing if unsubscribed) worker.schedule(this); } }); // some time later... worker.unsubscribe();
Worker
类的对象实现了 Subscription
接口,及其 isUnsubscribed
和 unsubscribe
方法,因此您可以在取消订阅时停止工作,或者您可以在计划的任务中取消订阅
Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { status = yourWork(); if(QUIT == status) { worker.unsubscribe(); } } } });
Worker
也是一个 Subscription
,因此您可以(而且应该最终)调用其 unsubscribe
方法来表示它可以停止工作并释放资源
worker.unsubscribe();
您还可以使用 schedule
的一个版本,该版本会延迟在给定调度器上执行您的操作,直到经过特定时间跨度。以下示例安排在 someScheduler
上执行 someAction
,在该调度器的时钟显示经过 500 毫秒之后
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
另一个 Scheduler
方法 允许您安排在定期时间间隔执行操作。以下示例安排在 someScheduler
上执行 someAction
,在该调度器的时钟显示经过 500 毫秒之后,然后每隔 250 毫秒执行一次
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler
允许您对调度器的时钟行为进行精确的手动控制。这对于测试依赖于时间中动作精确安排的交互很有用。此调度器具有三种额外的方法
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
在 RxJS 中,您可以从 Rx.Scheduler
对象或作为独立实现的对象获取调度器。下表显示了在 RxJS 中可用的调度器种类。
调度器 | 用途 |
---|---|
Rx.Scheduler.currentThread | 尽快在当前线程上调度工作 |
Rx.HistoricalScheduler | 调度工作,就好像它在任意历史时间发生一样 |
Rx.Scheduler.immediate | 立即在当前线程上调度工作 |
Rx.TestScheduler | 用于单元测试;这允许您手动操作时间的推移 |
Rx.Scheduler.timeout | 通过定时回调调度工作 |
待定
待定
待定
待定
待定