Sample 操作符周期性地观察 Observable,并在每次采样时发射自上次采样以来 Observable 最近发射的项目。
在一些实现中,还有一个 ThrottleFirst 操作符,它与 Sample 类似,但它发射的不是采样周期内最近发射的项目,而是该周期内发射的 *第一个* 项目。
待定
待定
RxGroovy 通过 sample
和 throttleLast
实现此操作符。
请注意,如果源 Observable 自上次采样以来未发射任何项目,则此操作符生成的 Observable 在该采样周期内不会发射任何项目。
sample
(或其别名 throttleLast
)的一个变体按您选择的周期性时间间隔进行采样,方法是将 TimeUnit
和此类单位的数量作为参数传递给 sample
。
以下代码构造了一个 Observable,它发射 1 到一百万之间的数字,然后每 10 毫秒对该 Observable 进行采样,以查看它此时正在发射哪个数字。
def numbers = Observable.range( 1, 1000000 ); numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
339707 547810 891282 Sequence complete
sample
的这种变体默认情况下在 computation
Scheduler 上运行,但您可以选择性地将您选择的 Scheduler 作为第三个参数传递。
还有一个 sample
变体(没有 throttleLast
别名),它在第二个 Observable 发射项目(或终止)时对源 Observable 进行采样。您将该第二个 Observable 作为参数传递给 sample
。
sample
的这种变体默认情况下不会在任何特定的 Scheduler 上运行。
sample(Observable)
还有一个 throttleFirst
操作符,它与 throttleLast
/sample
不同,它发射的是每个采样周期内源 Observable 发射的 *第一个* 项目,而不是 *最近发射的* 项目。
Scheduler s = new TestScheduler(); PublishSubject<Integer> o = PublishSubject.create(); o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted ); // send events with simulated time increments s.advanceTimeTo(0, TimeUnit.MILLISECONDS); o.onNext(1); // deliver o.onNext(2); // skip s.advanceTimeTo(501, TimeUnit.MILLISECONDS); o.onNext(3); // deliver s.advanceTimeTo(600, TimeUnit.MILLISECONDS); o.onNext(4); // skip s.advanceTimeTo(700, TimeUnit.MILLISECONDS); o.onNext(5); // skip o.onNext(6); // skip s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); o.onNext(7); // deliver s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); o.onCompleted();
1 3 7 Sequence complete
throttleFirst
默认情况下在 computation
Scheduler 上运行,但您可以选择性地将您选择的 Scheduler 作为第三个参数传递。
RxJava 通过 sample
和 throttleLast
实现此操作符。
请注意,如果源 Observable 自上次采样以来未发射任何项目,则此操作符生成的 Observable 在该采样周期内不会发射任何项目。
sample
(或其别名 throttleLast
)的一个变体按您选择的周期性时间间隔进行采样,方法是将 TimeUnit
和此类单位的数量作为参数传递给 sample
。
sample
的这种变体默认情况下在 computation
Scheduler 上运行,但您可以选择性地将您选择的 Scheduler 作为第三个参数传递。
还有一个 sample
变体(没有 throttleLast
别名),它在第二个 Observable 发射项目(或终止)时对源 Observable 进行采样。您将该第二个 Observable 作为参数传递给 sample
。
sample
的这种变体默认情况下不会在任何特定的 Scheduler 上运行。
sample(Observable)
还有一个 throttleFirst
操作符,它与 throttleLast
/sample
不同,它发射的是每个采样周期内源 Observable 发射的 *第一个* 项目,而不是 *最近发射的* 项目。
throttleFirst
默认情况下在 computation
Scheduler 上运行,但您可以选择性地将您选择的 Scheduler 作为第三个参数传递。
RxJS 通过 sample
的两个变体来实现此操作符。
第一个变体接受一个周期性作为其参数,定义为一个整数的毫秒数,它以该频率对源 Observable 进行周期性采样。
var source = Rx.Observable.interval(1000) .sample(5000) .take(2); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 3 Next: 8 Completed
第二个变体接受一个 Observable 作为其参数,并在该第二个 Observable 发射项目时对源 Observable 进行采样。
var source = Rx.Observable.interval(1000) .sample(Rx.Observable.interval(5000)) .take(2); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 3 Next: 8 Completed
还有一个 throttleFirst
操作符,它与 sample
不同,它发射的是每个采样周期内源 Observable 发射的 *第一个* 项目,而不是 *最近发射的* 项目。
它没有使用来自第二个 Observable 的发射来调节采样周期性的变体。
var times = [ { value: 0, time: 100 }, { value: 1, time: 600 }, { value: 2, time: 400 }, { value: 3, time: 900 }, { value: 4, time: 200 } ]; // Delay each item by time and project value; var source = Rx.Observable.from(times) .flatMap(function (item) { return Rx.Observable .of(item.value) .delay(item.time); }) .throttleFirst(300 /* ms */); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Next: 0 Next: 2 Next: 3 Completed
sample
和 throttleFirst
默认情况下在 timeout
Scheduler 上运行。它们位于以下每个发行版中
rx.all.js
rx.all.compat.js
rx.time.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
待定
待定