您可以注册回调,ReactiveX 将在 Observable 上发生某些事件时调用这些回调,其中这些回调将独立于与 Observable 级联相关的正常通知集进行调用。 各种 ReactiveX 实现设计了各种运算符来允许这样做。
do finally
待定
finally
doOnCompleted doOnEach doOnError doOnNext doOnRequest doOnSubscribe doOnTerminate doOnUnsubscribe finallyDo
RxGroovy 有几个 Do 变体。
doOnEach 运算符允许您建立一个回调,结果 Observable 将在每次它发出一个项目时调用该回调。 您可以以接受 onNext 种类 Notification 作为其唯一参数的 Action 的形式传递此回调,或者您可以传入一个 Observer,其 onNext 方法将被调用,就好像它订阅了 Observable 一样。
doOnEach
onNext
Notification
Action
doOnEach(Action1)
doOnEach(Observer)
doOnNext 运算符非常类似于 doOnEach(Action1),只是您作为参数传递给它的 Action 不会接受 Notification,而是简单地接受发出的项目。
doOnNext
doOnNext(Action1)
doOnRequest 运算符(RxGroovy 1.1 中的新增功能)注册一个 Action,该 Action 将在观察者请求结果 Observable 中的更多项目时被调用。 该 Action 以观察者请求的项目数量作为其参数。
doOnRequest
doOnRequest(Action1)
doOnSubscribe 运算符注册一个 Action,该 Action 将在观察者订阅结果 Observable 时被调用。
doOnSubscribe
doOnSubscribe(Action0)
doOnUnsubscribe 运算符注册一个 Action,该 Action 将在观察者取消订阅结果 Observable 时被调用。
doOnUnsubscribe
doOnUnsubscribe(Action0)
doOnCompleted 运算符注册一个 Action,该 Action 将在结果 Observable 正常终止时被调用,调用 onCompleted。
doOnCompleted
onCompleted
doOnCompleted(Action0)
doOnError 运算符注册一个 Action,该 Action 将在结果 Observable 非正常终止时被调用,调用 onError。 此 Action 将传递表示错误的 Throwable。
doOnError
onError
Throwable
doOnError(Action1)
doOnTerminate 运算符注册一个 Action,该 Action 将在结果 Observable 终止之前被调用,无论正常终止还是错误终止。
doOnTerminate
doOnTerminate(Action0)
finallyDo 运算符注册一个 Action,该 Action 将在结果 Observable 终止之后被调用,无论正常终止还是错误终止。
finallyDo
def numbers = Observable.from([1, 2, 3, 4, 5]); numbers.finallyDo({ println('Finally'); }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 4 5 Sequence complete Finally
finallyDo(Action0)
doOnCompleted doOnEach doOnError doOnNext doOnRequest doOnSubscribe doOnTerminate doOnUnsubscribe finallyDo doAfterTerminate
RxJava 有几个 Do 变体。
Observable.just(1, 2, 3) .doOnNext(new Action1<Integer>() { @Override public void call(Integer item) { if( item > 1 ) { throw new RuntimeException( "Item exceeds maximum value" ); } } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Error: Item exceeds maximum value
doOnRequest 运算符(RxJava 1.1 中的新增功能)注册一个 Action,该 Action 将在观察者请求结果 Observable 中的更多项目时被调用。 该 Action 以观察者请求的项目数量作为其参数。
finallyDo 从 RxJava 1.1.1 开始被弃用,取而代之的是具有相同行为的 doAfterTerminate。
doAfterTerminate
doAfterTerminate 运算符注册一个 Action,该 Action 将在结果 Observable 终止之后被调用,无论正常终止还是错误终止。
doAfterTerminate(Action0)
doAfterTerminate doOnComplete doOnDispose doOnEach doOnError doOnLifecycle doOnNext doOnSubscribe doOnTerminate onTerminateDetach
do doOnCompleted doOnError doOnNext finally tap tapOnCompleted tapOnError tapOnNext
RxJS 将基本 Do 运算符实现为 do 或 tap(两个运算符的名称相同)。 您有两个选择可以使用此运算符
do
tap
/* Using an observer */ var observer = Rx.Observer.create( function (x) { console.log('Do Next: %s', x); }, function (err) { console.log('Do Error: %s', err); }, function () { console.log('Do Completed'); } ); var source = Rx.Observable.range(0, 3) .do(observer); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Do Completed Completed
/* Using a function */ var source = Rx.Observable.range(0, 3) .do( function (x) { console.log('Do Next:', x); }, function (err) { console.log('Do Error:', err); }, function () { console.log('Do Completed'); } ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
RxJS 还实现了 doOnNext 或 tapOnNext(两个运算符的名称相同)。 它是 Do 的一种专用形式,它只对 onNext 案例做出响应,通过调用您作为参数提供的回调函数。 您也可以选择传递第二个参数,该参数将是执行回调函数时的回调函数的“this”对象。
tapOnNext
this
var source = Rx.Observable.range(0, 3) .doOnNext( function () { this.log('Do Next: %s', x); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Completed
RxJS 还实现了 doOnError 或 tapOnError(两个运算符的名称相同)。 它是 Do 的一种专用形式,它只对 onError 案例做出响应,通过调用您作为参数提供的回调函数。 您也可以选择传递第二个参数,该参数将是执行回调函数时的回调函数的“this”对象。
tapOnError
var source = Rx.Observable.throw(new Error()); .doOnError( function (err) { this.log('Do Error: %s', err); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Error: Error Error: Error
RxJS 还实现了 doOnCompleted 或 tapOnCompleted(两个运算符的名称相同)。 它是 Do 的一种专用形式,它只对 onCompleted 案例做出响应,通过调用您作为参数提供的回调函数。 您也可以选择传递第二个参数,该参数将是执行回调函数时的回调函数的“this”对象。
tapOnCompleted
var source = Rx.Observable.range(0, 3) .doOnCompleted( function () { this.log('Do Completed'); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Do Completed Completed
RxJS 还实现了一个 finally 运算符。 它接受一个函数,该函数将在结果 Observable 终止后被调用,无论正常终止(onCompleted)还是非正常终止(onError)。
var source = Rx.Observable.throw(new Error()) .finally(function () { console.log('Finally'); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Error: Error Finally
do/tap、doOnNext/tapOnNext、doOnError/tapOnError、doOnCompleted/tapOnCompleted 和 finally 在以下每个发行版中都能找到
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
doOnCompleted doOnEach doOnError doOnNext doOnSubscribe doOnTerminate doOnUnsubscribe finallyDo
Do Finally
do doOnError doOnCompleted finally
RxPHP 将此运算符实现为 do。
对可观察序列中的每个元素调用一个操作,并在可观察序列优雅或异常终止时调用一个操作。 此方法可用于调试、日志记录等查询行为,通过拦截消息流来对管道上的消息运行任意操作。 使用 do 时,请务必注意,观察者可能会在流完成或出错后收到更多事件(例如,使用 repeat 或重新订阅时)。 如果您使用的是扩展 AbstractObservable 的 Observable,则不会收到这些事件。 对于这种情况,请使用 DoObserver。 doOnNext、doOnError 和 doOnCompleted 在内部使用 DoObserver,并将收到这些额外的事件。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/do.php $source = \Rx\Observable::range(0, 3) ->do( function ($x) { echo 'Do Next:', $x, PHP_EOL; }, function (Throwable $err) { echo 'Do Error:', $err->getMessage(), PHP_EOL; }, function () { echo 'Do Completed', PHP_EOL; } ); $subscription = $source->subscribe($stdoutObserver);
Do Next:0 Next value: 0 Do Next:1 Next value: 1 Do Next:2 Next value: 2 Do Completed Complete!
RxPHP 还具有一个 doOnError 运算符。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnError.php $source = \Rx\Observable::error(new Exception('Oops')) ->doOnError(function (Throwable $err) { echo 'Do Error:', $err->getMessage(), PHP_EOL; }); $subscription = $source->subscribe($stdoutObserver);
Do Error:Oops Exception: Oops
RxPHP 还具有一个 doOnCompleted 运算符。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnCompleted.php $source = \Rx\Observable::empty() ->doOnCompleted(function () { echo 'Do Completed', PHP_EOL; }); $subscription = $source->subscribe($stdoutObserver);
Do Completed Complete!
RxPHP 还具有一个 finally 运算符。
当源在完成或错误上终止时,将调用指定函数。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/finally/finally.php Rx\Observable::range(1, 3) ->finally(function() { echo "Finally\n"; }) ->subscribe($stdoutObserver);
Next value: 1 Next value: 2 Next value: 3 Complete! Finally
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/finally/finally-error.php Rx\Observable::range(1, 3) ->map(function($value) { if ($value == 2) { throw new \Exception('error'); } return $value; }) ->finally(function() { echo "Finally\n"; }) ->subscribe($stdoutObserver);
Next value: 1 Exception: error Finally
do_action finally_action tap
ensures tap
doOn