Sample 操作符周期性地观察 Observable,并在每次采样时发射自上次采样以来 Observable 最近发射的项目。
在一些实现中,还有一个 ThrottleFirst 操作符,它与 Sample 类似,但它发射的不是采样周期内最近发射的项目,而是该周期内发射的 *第一个* 项目。
sample
待定
sample throttleFirst throttleLast
RxGroovy 通过 sample 和 throttleLast 实现此操作符。
throttleLast
请注意,如果源 Observable 自上次采样以来未发射任何项目,则此操作符生成的 Observable 在该采样周期内不会发射任何项目。
sample(或其别名 throttleLast)的一个变体按您选择的周期性时间间隔进行采样,方法是将 TimeUnit 和此类单位的数量作为参数传递给 sample。
TimeUnit
以下代码构造了一个 Observable,它发射 1 到一百万之间的数字,然后每 10 毫秒对该 Observable 进行采样,以查看它此时正在发射哪个数字。
def numbers = Observable.range( 1, 1000000 ); numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
339707 547810 891282 Sequence complete
sample 的这种变体默认情况下在 computation Scheduler 上运行,但您可以选择性地将您选择的 Scheduler 作为第三个参数传递。
computation
sample(long,TimeUnit)
throttleLast(long,TimeUnit)
sample(long,TimeUnit,Scheduler)
throttleLast(long,TimeUnit,Scheduler)
还有一个 sample 变体(没有 throttleLast 别名),它在第二个 Observable 发射项目(或终止)时对源 Observable 进行采样。您将该第二个 Observable 作为参数传递给 sample。
sample 的这种变体默认情况下不会在任何特定的 Scheduler 上运行。
sample(Observable)
还有一个 throttleFirst 操作符,它与 throttleLast/sample 不同,它发射的是每个采样周期内源 Observable 发射的 *第一个* 项目,而不是 *最近发射的* 项目。
throttleFirst
Scheduler s = new TestScheduler(); PublishSubject<Integer> o = PublishSubject.create(); o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted ); // send events with simulated time increments s.advanceTimeTo(0, TimeUnit.MILLISECONDS); o.onNext(1); // deliver o.onNext(2); // skip s.advanceTimeTo(501, TimeUnit.MILLISECONDS); o.onNext(3); // deliver s.advanceTimeTo(600, TimeUnit.MILLISECONDS); o.onNext(4); // skip s.advanceTimeTo(700, TimeUnit.MILLISECONDS); o.onNext(5); // skip o.onNext(6); // skip s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); o.onNext(7); // deliver s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); o.onCompleted();
1 3 7 Sequence complete
throttleFirst 默认情况下在 computation Scheduler 上运行,但您可以选择性地将您选择的 Scheduler 作为第三个参数传递。
throttleFirst(long,TimeUnit)
throttleFirst(long,TimeUnit,Scheduler)
RxJava 通过 sample 和 throttleLast 实现此操作符。
sample throttleFirst
RxJS 通过 sample 的两个变体来实现此操作符。
第一个变体接受一个周期性作为其参数,定义为一个整数的毫秒数,它以该频率对源 Observable 进行周期性采样。
var source = Rx.Observable.interval(1000) .sample(5000) .take(2); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 3 Next: 8 Completed
第二个变体接受一个 Observable 作为其参数,并在该第二个 Observable 发射项目时对源 Observable 进行采样。
var source = Rx.Observable.interval(1000) .sample(Rx.Observable.interval(5000)) .take(2); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
还有一个 throttleFirst 操作符,它与 sample 不同,它发射的是每个采样周期内源 Observable 发射的 *第一个* 项目,而不是 *最近发射的* 项目。
它没有使用来自第二个 Observable 的发射来调节采样周期性的变体。
var times = [ { value: 0, time: 100 }, { value: 1, time: 600 }, { value: 2, time: 400 }, { value: 3, time: 900 }, { value: 4, time: 200 } ]; // Delay each item by time and project value; var source = Rx.Observable.from(times) .flatMap(function (item) { return Rx.Observable .of(item.value) .delay(item.time); }) .throttleFirst(300 /* ms */); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Next: 0 Next: 2 Next: 3 Completed
sample 和 throttleFirst 默认情况下在 timeout Scheduler 上运行。它们位于以下每个发行版中
timeout
rx.all.js
rx.all.compat.js
rx.time.js
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
Sample
sample throttle_first throttle_last
sample sampleLatest