您可以使用 Take 操作符修改 Observable,只发出 Observable 发出的前 n 个项目,然后完成,并忽略其余项目。
take
待定
在 RxGroovy 中,此操作符实现为 take
。
如果您在 Observable 上使用 take(n)
操作符(或其同义词 limit(n)
),并且该 Observable 在完成之前发出的项目少于 n 个,则新的 take
修改后的 Observable 不会抛出异常或调用 onError
,而只会发出这个相同的更少的项目数,然后完成。
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]); numbers.take(3).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 Sequence complete
这种 take
变体默认情况下不会对任何特定的 Scheduler 进行操作。
take(int)
take
还有一个变体,它采用时间持续时间而不是项目数量。它会生成一个 Observable,该 Observable 仅发出在源 Observable 生命周期初始持续时间内发出的那些项目。您可以通过将时间长度和该长度所表示的时间单位作为参数传递给 take
来设置此持续时间。
这种 take
变体默认情况下在 computation
Scheduler 上运行,但您也可以选择性地将您选择的 Scheduler 作为可选的第三个参数传递。
take(long,TimeUnit)
take(long,TimeUnit,Scheduler)
在 RxJava 中,此操作符实现为 take
。
如果您在 Observable 上使用 take(n)
操作符(或其同义词 limit(n)
),并且该 Observable 在完成之前发出的项目少于 n 个,则新的 take
修改后的 Observable 不会抛出异常或调用 onError
,而只会发出这个相同的更少的项目数,然后完成。
Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .take(4) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
这种 take
变体默认情况下不会对任何特定的 Scheduler 进行操作。
take(int)
take
还有一个变体,它采用时间持续时间而不是项目数量。它会生成一个 Observable,该 Observable 仅发出在源 Observable 生命周期初始持续时间内发出的那些项目。您可以通过将时间长度和该长度所表示的时间单位作为参数传递给 take
来设置此持续时间。
这种 take
变体默认情况下在 computation
Scheduler 上运行,但您也可以选择性地将您选择的 Scheduler 作为可选的第三个参数传递。
take(long,TimeUnit)
take(long,TimeUnit,Scheduler)
RxJS 实现 take
操作符。
var source = Rx.Observable.range(0, 5) .take(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Completed
对于 take(0)
的特殊情况,您还可以将 Scheduler 作为第二个参数传递,take
将使用该参数立即安排对 onCompleted
的调用。
take
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxJS 还实现了一个 takeUntilWithTime
操作符,它类似于 take
,不同的是,它不会获取特定数量的项目,而是获取在初始时间段内发出的所有项目。您可以通过将参数传递给 takeUntilWithTime
来建立此时间段,格式如下
Date
您还可以选择性地将 Scheduler 作为第二个参数传递,计时器将在该 Scheduler 上运行(takeUntilWithTime
默认使用 timeout
Scheduler)。
var source = Rx.Observable.timer(0, 1000) .takeUntilWithTime(5000); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
takeUntilWithTime
存在于以下每个发行版中
rx.all.js
rx.all.compat.js
rx.time.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
待定
RxPHP 将此操作符实现为 take
。
从可观察序列的开头返回指定数量的连续元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/take.php $observable = Rx\Observable::fromArray([21, 42, 63]); $observable ->take(2) ->subscribe($stdoutObserver);
Next value: 21 Next value: 42 Complete!
RxPHP 还具有一个 takeUntil
操作符。
返回源可观察序列中的值,直到另一个可观察序列产生值。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/takeUntil.php $source = \Rx\Observable::interval(105) ->takeUntil(\Rx\Observable::timer(1000)); $subscription = $source->subscribe($stdoutObserver);
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Complete!
待定
待定
待定