Do

注册一个操作,以便在 Observable 的各种生命周期事件发生时执行。

Do

您可以注册回调,ReactiveX 将在 Observable 上发生某些事件时调用这些回调,其中这些回调将独立于与 Observable 级联相关的正常通知集进行调用。 各种 ReactiveX 实现设计了各种运算符来允许这样做。

另请参阅

特定于语言的信息

待定

RxGroovy 有几个 Do 变体。

doOnEach

doOnEach 运算符允许您建立一个回调,结果 Observable 将在每次它发出一个项目时调用该回调。 您可以以接受 onNext 种类 Notification 作为其唯一参数的 Action 的形式传递此回调,或者您可以传入一个 Observer,其 onNext 方法将被调用,就好像它订阅了 Observable 一样。

doOnNext

doOnNext 运算符非常类似于 doOnEach(Action1),只是您作为参数传递给它的 Action 不会接受 Notification,而是简单地接受发出的项目。

doOnRequest 运算符(RxGroovy 1.1 中的新增功能)注册一个 Action,该 Action 将在观察者请求结果 Observable 中的更多项目时被调用。 该 Action 以观察者请求的项目数量作为其参数。

doOnSubscribe

doOnSubscribe 运算符注册一个 Action,该 Action 将在观察者订阅结果 Observable 时被调用。

doOnUnsubscribe

doOnUnsubscribe 运算符注册一个 Action,该 Action 将在观察者取消订阅结果 Observable 时被调用。

doOnCompleted

doOnCompleted 运算符注册一个 Action,该 Action 将在结果 Observable 正常终止时被调用,调用 onCompleted

doOnError

doOnError 运算符注册一个 Action,该 Action 将在结果 Observable 非正常终止时被调用,调用 onError。 此 Action 将传递表示错误的 Throwable

doOnTerminate

doOnTerminate 运算符注册一个 Action,该 Action 将在结果 Observable 终止之前被调用,无论正常终止还是错误终止。

finallyDo

finallyDo 运算符注册一个 Action,该 Action 将在结果 Observable 终止之后被调用,无论正常终止还是错误终止。

示例代码

def numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.finallyDo({ println('Finally'); }).subscribe(
   { println(it); },                          // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence complete"); }          // onCompleted
);
1
2
3
4
5
Sequence complete
Finally

RxJava 有几个 Do 变体。

doOnEach

doOnEach 运算符允许您建立一个回调,结果 Observable 将在每次它发出一个项目时调用该回调。 您可以以接受 onNext 种类 Notification 作为其唯一参数的 Action 的形式传递此回调,或者您可以传入一个 Observer,其 onNext 方法将被调用,就好像它订阅了 Observable 一样。

doOnNext

doOnNext 运算符非常类似于 doOnEach(Action1),只是您作为参数传递给它的 Action 不会接受 Notification,而是简单地接受发出的项目。

示例代码

Observable.just(1, 2, 3)
          .doOnNext(new Action1<Integer>() {
          @Override
          public void call(Integer item) {
            if( item > 1 ) {
              throw new RuntimeException( "Item exceeds maximum value" );
            }
          }
        }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Error: Item exceeds maximum value

doOnRequest 运算符(RxJava 1.1 中的新增功能)注册一个 Action,该 Action 将在观察者请求结果 Observable 中的更多项目时被调用。 该 Action 以观察者请求的项目数量作为其参数。

doOnSubscribe

doOnSubscribe 运算符注册一个 Action,该 Action 将在观察者订阅结果 Observable 时被调用。

doOnUnsubscribe

doOnUnsubscribe 运算符注册一个 Action,该 Action 将在观察者取消订阅结果 Observable 时被调用。

doOnCompleted

doOnCompleted 运算符注册一个 Action,该 Action 将在结果 Observable 正常终止时被调用,调用 onCompleted

doOnError

doOnError 运算符注册一个 Action,该 Action 将在结果 Observable 非正常终止时被调用,调用 onError。 此 Action 将传递表示错误的 Throwable

doOnTerminate

doOnTerminate 运算符注册一个 Action,该 Action 将在结果 Observable 终止之前被调用,无论正常终止还是错误终止。

finallyDo

finallyDo 从 RxJava 1.1.1 开始被弃用,取而代之的是具有相同行为的 doAfterTerminate

finallyDo 运算符注册一个 Action,该 Action 将在结果 Observable 终止之后被调用,无论正常终止还是错误终止。

doAfterTerminate

doAfterTerminate 运算符注册一个 Action,该 Action 将在结果 Observable 终止之后被调用,无论正常终止还是错误终止。

do

RxJS 将基本 Do 运算符实现为 dotap(两个运算符的名称相同)。 您有两个选择可以使用此运算符

  1. 您可以将 Observer 传递给它,在这种情况下,do/tap 将调用该 Observer 的方法,就好像该 Observer 订阅了结果 Observable 一样。
  2. 您可以传递一组 1-3 个单独的函数(onNextonErroronCompleted),do/tap 将与任何观察者的同名函数一起调用这些函数。

示例代码

/* Using an observer */
var observer = Rx.Observer.create(
  function (x) { console.log('Do Next: %s', x); },
  function (err) { console.log('Do Error: %s', err); },
  function () { console.log('Do Completed'); }
);

var source = Rx.Observable.range(0, 3)
    .do(observer);

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Do Completed
Completed
/* Using a function */
var source = Rx.Observable.range(0, 3)
  .do(
    function (x)   { console.log('Do Next:', x); },
    function (err) { console.log('Do Error:', err); },
    function ()    { console.log('Do Completed'); }
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Do Completed
Completed
doOnNext

RxJS 还实现了 doOnNexttapOnNext(两个运算符的名称相同)。 它是 Do 的一种专用形式,它只对 onNext 案例做出响应,通过调用您作为参数提供的回调函数。 您也可以选择传递第二个参数,该参数将是执行回调函数时的回调函数的“this”对象。

示例代码

var source = Rx.Observable.range(0, 3)
  .doOnNext(
    function () { this.log('Do Next: %s', x); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Completed
doOnError

RxJS 还实现了 doOnErrortapOnError(两个运算符的名称相同)。 它是 Do 的一种专用形式,它只对 onError 案例做出响应,通过调用您作为参数提供的回调函数。 您也可以选择传递第二个参数,该参数将是执行回调函数时的回调函数的“this”对象。

示例代码

var source = Rx.Observable.throw(new Error());
  .doOnError(
    function (err) { this.log('Do Error: %s', err); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Error: Error
Error: Error
doOnCompleted

RxJS 还实现了 doOnCompletedtapOnCompleted(两个运算符的名称相同)。 它是 Do 的一种专用形式,它只对 onCompleted 案例做出响应,通过调用您作为参数提供的回调函数。 您也可以选择传递第二个参数,该参数将是执行回调函数时的回调函数的“this”对象。

示例代码

var source = Rx.Observable.range(0, 3)
  .doOnCompleted(
    function () { this.log('Do Completed'); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Do Completed
Completed
finally

RxJS 还实现了一个 finally 运算符。 它接受一个函数,该函数将在结果 Observable 终止后被调用,无论正常终止(onCompleted)还是非正常终止(onError)。

示例代码

var source = Rx.Observable.throw(new Error())
    .finally(function () {
        console.log('Finally');
    });

var subscription = source.subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Error: Error
Finally

do/tapdoOnNext/tapOnNextdoOnError/tapOnErrordoOnCompleted/tapOnCompletedfinally 在以下每个发行版中都能找到

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP 将此运算符实现为 do

对可观察序列中的每个元素调用一个操作,并在可观察序列优雅或异常终止时调用一个操作。 此方法可用于调试、日志记录等查询行为,通过拦截消息流来对管道上的消息运行任意操作。 使用 do 时,请务必注意,观察者可能会在流完成或出错后收到更多事件(例如,使用 repeat 或重新订阅时)。 如果您使用的是扩展 AbstractObservable 的 Observable,则不会收到这些事件。 对于这种情况,请使用 DoObserver。 doOnNext、doOnError 和 doOnCompleted 在内部使用 DoObserver,并将收到这些额外的事件。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/do.php

$source = \Rx\Observable::range(0, 3)
    ->do(
        function ($x) {
            echo 'Do Next:', $x, PHP_EOL;
        },
        function (Throwable $err) {
            echo 'Do Error:', $err->getMessage(), PHP_EOL;
        },
        function () {
            echo 'Do Completed', PHP_EOL;
        }
    );

$subscription = $source->subscribe($stdoutObserver);
   
Do Next:0
Next value: 0
Do Next:1
Next value: 1
Do Next:2
Next value: 2
Do Completed
Complete!
    

RxPHP 还具有一个 doOnError 运算符。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnError.php

$source = \Rx\Observable::error(new Exception('Oops'))
    ->doOnError(function (Throwable $err) {
        echo 'Do Error:', $err->getMessage(), PHP_EOL;
    });

$subscription = $source->subscribe($stdoutObserver);
   
Do Error:Oops
Exception: Oops
    

RxPHP 还具有一个 doOnCompleted 运算符。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnCompleted.php

$source = \Rx\Observable::empty()
    ->doOnCompleted(function () {
        echo 'Do Completed', PHP_EOL;
    });

$subscription = $source->subscribe($stdoutObserver);
   
Do Completed
Complete!
    

RxPHP 还具有一个 finally 运算符。

当源在完成或错误上终止时,将调用指定函数。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/finally/finally.php

Rx\Observable::range(1, 3)
    ->finally(function() {
        echo "Finally\n";
    })
    ->subscribe($stdoutObserver);

   
Next value: 1
Next value: 2
Next value: 3
Complete!
Finally
    
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/finally/finally-error.php

Rx\Observable::range(1, 3)
    ->map(function($value) {
        if ($value == 2) {
            throw new \Exception('error');
        }
        return $value;
    })
    ->finally(function() {
        echo "Finally\n";
    })
    ->subscribe($stdoutObserver);

   
Next value: 1
Exception: error
Finally
    

待定