Take

仅发出 Observable 发出的前 n 个项目

您可以使用 Take 操作符修改 Observable,只发出 Observable 发出的前 n 个项目,然后完成,并忽略其余项目。

另请参阅

特定语言信息

待定

take

在 RxGroovy 中,此操作符实现为 take

如果您在 Observable 上使用 take(n) 操作符(或其同义词 limit(n)),并且该 Observable 在完成之前发出的项目少于 n 个,则新的 take 修改后的 Observable 不会抛出异常或调用 onError,而只会发出这个相同的更少的项目数,然后完成。

示例代码

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]);

numbers.take(3).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
2
3
Sequence complete

这种 take 变体默认情况下不会对任何特定的 Scheduler 进行操作。

take

take 还有一个变体,它采用时间持续时间而不是项目数量。它会生成一个 Observable,该 Observable 仅发出在源 Observable 生命周期初始持续时间内发出的那些项目。您可以通过将时间长度和该长度所表示的时间单位作为参数传递给 take 来设置此持续时间。

这种 take 变体默认情况下在 computation Scheduler 上运行,但您也可以选择性地将您选择的 Scheduler 作为可选的第三个参数传递。

take

在 RxJava 中,此操作符实现为 take

如果您在 Observable 上使用 take(n) 操作符(或其同义词 limit(n)),并且该 Observable 在完成之前发出的项目少于 n 个,则新的 take 修改后的 Observable 不会抛出异常或调用 onError,而只会发出这个相同的更少的项目数,然后完成。

示例代码

Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
          .take(4)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

这种 take 变体默认情况下不会对任何特定的 Scheduler 进行操作。

take

take 还有一个变体,它采用时间持续时间而不是项目数量。它会生成一个 Observable,该 Observable 仅发出在源 Observable 生命周期初始持续时间内发出的那些项目。您可以通过将时间长度和该长度所表示的时间单位作为参数传递给 take 来设置此持续时间。

这种 take 变体默认情况下在 computation Scheduler 上运行,但您也可以选择性地将您选择的 Scheduler 作为可选的第三个参数传递。

take

RxJS 实现 take 操作符。

示例代码

var source = Rx.Observable.range(0, 5)
    .take(3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed
take

对于 take(0) 的特殊情况,您还可以将 Scheduler 作为第二个参数传递,take 将使用该参数立即安排对 onCompleted 的调用。

take 存在于以下每个发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
takeUntilWithTime

RxJS 还实现了一个 takeUntilWithTime 操作符,它类似于 take,不同的是,它不会获取特定数量的项目,而是获取在初始时间段内发出的所有项目。您可以通过将参数传递给 takeUntilWithTime 来建立此时间段,格式如下

数字
从 Observable 订阅开始算起,镜像源 Observable 中的项目,直到经过这么多毫秒。
Date
镜像源 Observable 中的项目,直到到达此绝对时间。

您还可以选择性地将 Scheduler 作为第二个参数传递,计时器将在该 Scheduler 上运行(takeUntilWithTime 默认使用 timeout Scheduler)。

示例代码

var source = Rx.Observable.timer(0, 1000)
    .takeUntilWithTime(5000);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed

takeUntilWithTime 存在于以下每个发行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.time.js(需要 rx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

待定

待定

RxPHP 将此操作符实现为 take

从可观察序列的开头返回指定数量的连续元素。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/take.php

$observable = Rx\Observable::fromArray([21, 42, 63]);
$observable
    ->take(2)
    ->subscribe($stdoutObserver);

   
Next value: 21
Next value: 42
Complete!
    

RxPHP 还具有一个 takeUntil 操作符。

返回源可观察序列中的值,直到另一个可观察序列产生值。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/takeUntil.php

$source = \Rx\Observable::interval(105)
    ->takeUntil(\Rx\Observable::timer(1000));

$subscription = $source->subscribe($stdoutObserver);


   
Next value: 0
Next value: 1
Next value: 2
Next value: 3
Next value: 4
Next value: 5
Next value: 6
Next value: 7
Next value: 8
Complete!
    

待定

待定

待定