您可以使用 Take 操作符修改 Observable,只发出 Observable 发出的前 n 个项目,然后完成,并忽略其余项目。
take
待定
limit take
在 RxGroovy 中,此操作符实现为 take。
如果您在 Observable 上使用 take(n) 操作符(或其同义词 limit(n)),并且该 Observable 在完成之前发出的项目少于 n 个,则新的 take 修改后的 Observable 不会抛出异常或调用 onError,而只会发出这个相同的更少的项目数,然后完成。
take(n)
limit(n)
onError
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]); numbers.take(3).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 Sequence complete
这种 take 变体默认情况下不会对任何特定的 Scheduler 进行操作。
take(int)
take 还有一个变体,它采用时间持续时间而不是项目数量。它会生成一个 Observable,该 Observable 仅发出在源 Observable 生命周期初始持续时间内发出的那些项目。您可以通过将时间长度和该长度所表示的时间单位作为参数传递给 take 来设置此持续时间。
这种 take 变体默认情况下在 computation Scheduler 上运行,但您也可以选择性地将您选择的 Scheduler 作为可选的第三个参数传递。
computation
take(long,TimeUnit)
take(long,TimeUnit,Scheduler)
在 RxJava 中,此操作符实现为 take。
Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .take(4) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
take takeUntilWithTime
RxJS 实现 take 操作符。
var source = Rx.Observable.range(0, 5) .take(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Completed
对于 take(0) 的特殊情况,您还可以将 Scheduler 作为第二个参数传递,take 将使用该参数立即安排对 onCompleted 的调用。
take(0)
onCompleted
take 存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxJS 还实现了一个 takeUntilWithTime 操作符,它类似于 take,不同的是,它不会获取特定数量的项目,而是获取在初始时间段内发出的所有项目。您可以通过将参数传递给 takeUntilWithTime 来建立此时间段,格式如下
takeUntilWithTime
Date
您还可以选择性地将 Scheduler 作为第二个参数传递,计时器将在该 Scheduler 上运行(takeUntilWithTime 默认使用 timeout Scheduler)。
timeout
var source = Rx.Observable.timer(0, 1000) .takeUntilWithTime(5000); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
takeUntilWithTime 存在于以下每个发行版中
rx.time.js
Take
take takeUntil
RxPHP 将此操作符实现为 take。
从可观察序列的开头返回指定数量的连续元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/take.php $observable = Rx\Observable::fromArray([21, 42, 63]); $observable ->take(2) ->subscribe($stdoutObserver);
Next value: 21 Next value: 42 Complete!
RxPHP 还具有一个 takeUntil 操作符。
takeUntil
返回源可观察序列中的值,直到另一个可观察序列产生值。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/takeUntil.php $source = \Rx\Observable::interval(105) ->takeUntil(\Rx\Observable::timer(1000)); $subscription = $source->subscribe($stdoutObserver);
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Complete!
take take_with_time