调度器

如果您希望将多线程引入您的 Observable 运算符级联中,您可以通过指示这些运算符(或特定 Observable)在特定的 调度器 上运行来实现。

一些 ReactiveX Observable 运算符具有接受调度器作为参数的变体。这些会指示运算符在特定调度器上完成部分或全部工作。

默认情况下,Observable 和您应用于它的运算符链将完成其工作,并在调用其 Subscribe 方法的相同线程上通知其观察者。 SubscribeOn 运算符通过指定 Observable 应该在其上运行的不同调度器来改变这种行为。 ObserveOn 运算符指定 Observable 将用于向其观察者发送通知的不同调度器。

如本图所示,SubscribeOn 运算符指定 Observable 将在其上开始运行的线程,无论该运算符在运算符链中的哪个位置被调用。另一方面,ObserveOn 会影响 Observable 在该运算符出现的下方使用的线程。出于这个原因,您可以在 Observable 运算符链中的不同位置多次调用 ObserveOn,以便更改某些运算符在其上运行的线程。

ObserveOn and SubscribeOn

另请参阅

特定于语言的信息

待定

待定

调度器的种类

您可以从 Schedulers 中描述的工厂方法获取调度器。下表显示了通过 RxGroovy 中这些方法可用的调度器种类

调度器用途
Schedulers.computation( )适用于计算工作,例如事件循环和回调处理;不要将此调度器用于 I/O(请改用 Schedulers.io( ));默认情况下,线程数等于处理器数量
Schedulers.from(executor)使用指定的 Executor 作为调度器
Schedulers.immediate( )调度工作立即在当前线程上开始
Schedulers.io( )适用于 I/O 密集型工作,例如异步执行阻塞 I/O,此调度器由一个线程池支持,该线程池会根据需要增长;对于普通计算工作,请切换到 Schedulers.computation( );默认情况下,Schedulers.io(&#8239) 是一个 CachedThreadScheduler,它类似于一个具有线程缓存的新线程调度器
Schedulers.newThread( )为每个工作单元创建一个新线程
Schedulers.trampoline( )将工作排队以在当前线程上开始,在任何已排队的工

RxGroovy Observable 运算符的默认调度器

RxGroovy 中的一些 Observable 运算符具有备选形式,允许您设置运算符将用于(至少部分)其操作的调度器。其他运算符不会在任何特定调度器上运行,或者在特定默认调度器上运行。具有特定默认调度器的运算符包括

运算符调度器
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalcomputation
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampcomputation
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

测试调度器

TestScheduler 允许您对调度器的时钟行为进行精确的手动控制。这对于测试依赖于时间中动作精确安排的交互很有用。此调度器具有三种额外的方法

advanceTimeTo(time,unit)
将调度器的时钟提前到特定时间点
advanceTimeBy(time,unit)
将调度器的时钟向前推进特定时间量
triggerActions( )
启动任何尚未启动的动作,这些动作已计划在调度器时钟

另请参阅

调度器的种类

您可以从 Schedulers 中描述的工厂方法获取调度器。下表显示了通过 RxJava 中这些方法可用的调度器种类

调度器用途
Schedulers.computation( )适用于计算工作,例如事件循环和回调处理;不要将此调度器用于 I/O(请改用 Schedulers.io( ));默认情况下,线程数等于处理器数量
Schedulers.from(executor)使用指定的 Executor 作为调度器
Schedulers.immediate( )调度工作立即在当前线程上开始
Schedulers.io( )适用于 I/O 密集型工作,例如异步执行阻塞 I/O,此调度器由一个线程池支持,该线程池会根据需要增长;对于普通计算工作,请切换到 Schedulers.computation( );默认情况下,Schedulers.io(&#8239) 是一个 CachedThreadScheduler,它类似于一个具有线程缓存的新线程调度器
Schedulers.newThread( )为每个工作单元创建一个新线程
Schedulers.trampoline( )将工作排队以在当前线程上开始,在任何已排队的工

RxJava 1.x Observable 运算符的默认调度器

RxJava 中的一些 Observable 运算符具有备选形式,允许您设置运算符将用于(至少部分)其操作的调度器。其他运算符不会在任何特定调度器上运行,或者在特定默认调度器上运行。具有特定默认调度器的运算符包括

运算符调度器
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalcomputation
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampcomputation
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

使用调度器

除了将这些调度器传递给 RxJava Observable 运算符外,您还可以使用它们在订阅上调度您自己的工作。以下示例使用 schedule 方法 Scheduler.Worker 来调度 newThread 调度器上的工作

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();

递归调度器

要调度递归调用,您可以使用 schedule,然后在 Worker 对象上使用 schedule(this)

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

检查或设置取消订阅状态

Worker 类的对象实现了 Subscription 接口,及其 isUnsubscribedunsubscribe 方法,因此您可以在取消订阅时停止工作,或者您可以在计划的任务中取消订阅

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

Worker 也是一个 Subscription,因此您可以(而且应该最终)调用其 unsubscribe 方法来表示它可以停止工作并释放资源

worker.unsubscribe();

延迟和周期性调度器

您还可以使用 schedule 的一个版本,该版本会延迟在给定调度器上执行您的操作,直到经过特定时间跨度。以下示例安排在 someScheduler 上执行 someAction,在该调度器的时钟显示经过 500 毫秒之后

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

另一个 Scheduler 方法 允许您安排在定期时间间隔执行操作。以下示例安排在 someScheduler 上执行 someAction,在该调度器的时钟显示经过 500 毫秒之后,然后每隔 250 毫秒执行一次

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

测试调度器

TestScheduler 允许您对调度器的时钟行为进行精确的手动控制。这对于测试依赖于时间中动作精确安排的交互很有用。此调度器具有三种额外的方法

advanceTimeTo(time,unit)
将调度器的时钟提前到特定时间点
advanceTimeBy(time,unit)
将调度器的时钟向前推进特定时间量
triggerActions( )
启动任何尚未启动的动作,这些动作已计划在调度器时钟

另请参阅

</div> </div> </div>

在 RxJS 中,您可以从 Rx.Scheduler 对象或作为独立实现的对象获取调度器。下表显示了在 RxJS 中可用的调度器种类。

调度器用途
Rx.Scheduler.currentThread尽快在当前线程上调度工作
Rx.HistoricalScheduler调度工作,就好像它在任意历史时间发生一样
Rx.Scheduler.immediate立即在当前线程上调度工作
Rx.TestScheduler用于单元测试;这允许您手动操作时间的推移
Rx.Scheduler.timeout通过定时回调调度工作

另请参阅

待定

待定

待定

待定

待定

</div>