Catch 运算符会拦截来自源 Observable 的 onError
通知,并不会将其传递给任何观察者,而是将其替换为其他项目或项目序列,从而可能允许生成的 Observable 正常终止或根本不终止。
Catch 运算符有几种变体,并且不同的 ReactiveX 实现使用各种名称来描述此操作,如以下各节所示。
在一些 ReactiveX 实现中,有一个运算符称为“OnErrorResumeNext”,其行为类似于 Catch 变体:具体来说,是对来自源 Observable 的 onError
通知做出反应。在其他实现中,有一个运算符具有相同的名称,但其行为更类似于 Concat 变体:无论源 Observable 是正常终止还是以错误终止,它都执行串联操作。这很不幸且令人困惑,但我们不得不忍受。
RxClojure 将此运算符实现为 catch*
。此运算符接受两个参数,它们都是您选择的函数,这些函数将由 onError
引发的异常作为其单个参数。第一个函数是一个谓词。如果它返回 false
,catch*
会将 onError
通知原封不动地传递给其观察者。但是,如果它返回 true
,catch*
会吞掉错误,调用第二个函数(它返回一个 Observable),并将来自此新 Observable 的发射和通知传递给其观察者。
您可以用表示各种异常的类对象替换第一个函数参数(评估异常的谓词)。如果您这样做,catch*
将将其视为等效于执行 instance?
检查以查看来自 onError
通知的异常是否是类对象的实例的谓词。换句话说
(->> my-observable (catch* IllegalArgumentException (fn [e] (rx/return 1))) )
等效于
(->> my-observable (catch* (fn [e] (-> instance? IllegalArgumentException e)) (fn [e] (rx/return 1))) )
RxCpp 未实现 Catch 运算符。
RxGroovy 以与 RxJava 相同的方式实现 Catch 运算符。有三个不同的运算符提供此功能
onErrorReturn
onErrorResumeNext
onExceptionResumeNext
onErrorReturn
onErrorReturn
方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,它不会将该错误传播给观察者,而会改为发射指定项目并调用观察者的 onCompleted
方法,如下面的示例代码所示
def myObservable = Observable.create({ aSubscriber -> if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Four'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Three'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Two'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('One'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onError(); }); myObservable.onErrorReturn({ return('Blastoff!'); }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
Four Three Two One Blastoff! Sequence complete
onErrorReturn(Func1)
onErrorResumeNext
onErrorResumeNext
方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,它不会将该错误传播给观察者,而会改为开始镜像第二个备份 Observable,如下面的示例代码所示
def myObservable = Observable.create({ aSubscriber -> if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Three'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Two'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('One'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onError(); }); def myFallback = Observable.create({ aSubscriber -> if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('0'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('1'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('2'); if(false == aSubscriber.isUnsubscribed()) aSubscriber.onCompleted(); }); myObservable.onErrorResumeNext(myFallback).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
Three Two One 0 1 2 Sequence complete
onErrorResumeNext(Func1)
onErrorResumeNext(Observable)
onExceptionResumeNext
与 onErrorResumeNext
方法非常类似,它返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,如果传递给 onError
的 Throwable 是一个 Exception,它不会将该 Exception 传播给观察者,而会改为开始镜像第二个备份 Observable。如果 Throwable 不是一个 Exception,onExceptionResumeNext
返回的 Observable 将将其传播给其观察者的 onError
方法,并且不会调用其备份 Observable。
RxJava 使用三个不同的运算符实现 Catch 运算符
onErrorReturn
onErrorResumeNext
onExceptionResumeNext
onErrorReturn
onErrorReturn
方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,它不会将该错误传播给观察者,而会改为发射指定项目并调用观察者的 onCompleted
方法。
onErrorReturn(Func1)
onErrorResumeNext
onErrorResumeNext
方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,它不会将该错误传播给观察者,而会改为开始镜像第二个备份 Observable。
onErrorResumeNext(Func1)
onErrorResumeNext(Observable)
onExceptionResumeNext
与 onErrorResumeNext
方法非常类似,它返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,如果传递给 onError
的 Throwable 是一个 Exception,它不会将该 Exception 传播给观察者,而会改为开始镜像第二个备份 Observable。如果 Throwable 不是一个 Exception,onExceptionResumeNext
返回的 Observable 将将其传播给其观察者的 onError
方法,并且不会调用其备份 Observable。
RxJS 使用两个不同的运算符实现 Catch 运算符
catch
onErrorResumeNext
catch
catch
存在于以下发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
onErrorResumeNext
此实现借鉴了 Rx.NET 中令人困惑的命名法,其中 onErrorResumeNext
在错误和源 Observable 的正常无错误终止时切换到备份 Observable。
onErrorResumeNext
存在于以下发行版中
rx.js
rx.compat.js
RxKotlin 以与 RxJava 相同的方式实现 Catch 运算符。有三个不同的运算符提供此功能
onErrorReturn
onErrorResumeNext
onExceptionResumeNext
onErrorReturn
onErrorReturn
方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,它不会将该错误传播给观察者,而会改为发射指定项目并调用观察者的 onCompleted
方法。
onErrorResumeNext
onErrorResumeNext
方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,它不会将该错误传播给观察者,而会改为开始镜像第二个备份 Observable。
onExceptionResumeNext
与 onErrorResumeNext
方法非常类似,它返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,如果传递给 onError
的 Throwable 是一个 Exception,它不会将该 Exception 传播给观察者,而会改为开始镜像第二个备份 Observable。如果 Throwable 不是一个 Exception,onExceptionResumeNext
返回的 Observable 将将其传播给其观察者的 onError
方法,并且不会调用其备份 Observable。
Rx.NET 使用两个不同的运算符实现 Catch 运算符
Catch
OnErrorResumeNext
Catch
Catch
运算符有一个变体,允许您指定要捕获的 Exception 类型。如果您使用该运算符的变体,任何其他 Exception 都将被传递给观察者,就好像没有应用 Catch
运算符一样。
OnErrorResumeNext
此实现引入了一种令人困惑的命名法,其中尽管名为 OnErrorResumeNext
,它在错误和源 Observable 的正常无错误终止时切换到备份 Observable。因此,它更像是一个串联运算符。
RxPHP 将此运算符实现为 catch
。
继续由异常终止的 observable 序列,使用下一个 observable 序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/catch/catch.php $obs2 = Rx\Observable::of(42); $source = \Rx\Observable::error(new Exception('Some error')) ->catch(function (Throwable $e, \Rx\Observable $sourceObs) use ($obs2) { return $obs2; }); $subscription = $source->subscribe($stdoutObserver);
Next value: 42 Complete!
RxPY 使用两个不同的运算符实现 Catch 运算符
catch_exception
on_error_resume_next
catch_exception
您可以将一组备份 Observable 传递给 catch_exception
,这些 Observable 可以是单个函数参数,也可以是单个 Observable 数组。如果它遇到来自源 Observable 的 onError
通知,它将订阅并开始镜像这些备份 Observable 中的第一个 Observable。如果此备份 Observable 本身发出 onError
通知,catch_exception
将吞掉它并切换到下一个备份 Observable。如果这些 Observable 中的任何一个发出 onCompleted
通知,catch_exception
将将其传递并停止。
on_error_resume_next
您可以将一组备份 Observable 传递给 on_error_resume_next
,这些 Observable 可以是单个函数参数,也可以是单个 Observable 数组,或者是一个生成 Observable 的工厂函数。当源 Observable 终止时,无论是以正常方式终止还是以错误方式终止,on_error_resume_next
将订阅并开始镜像这些备份 Observable 中的第一个 Observable,然后递归地继续此串联过程,直到没有更多 Observable 需要镜像,此时它将传递来自这些 Observable 中最后一个 Observable 的 onError
或 onCompleted
通知。
Rx.rb 使用两个不同的运算符实现 Catch 运算符
rescue_error
on_error_resume_next
rescue_error
您可以将 Observable 或生成 Observable 的工厂操作传递给 rescue_error
。
on_error_resume_next
在 Rx.rb 中,on_error_resume_next
继承了来自 Rx.NET 的误导性命名法,因为它无论源序列是正常终止还是以错误终止,都会将第二个 Observable 序列串联到源序列。
Rx.rb 使用四个不同的运算符实现 Catch 运算符
onErrorFlatMap
onError
通知替换为来自辅助 Observable 的发射onErrorResumeNext
onErrorReturn
onExceptionResumeNext
onErrorFlatMap
onErrorFlatMap
处理一个特殊情况:一个不符合 Observable 合约 的源 Observable,它可能以这样一种方式将 onError
通知与发射交织在一起,即它可以在不终止的情况下进行交织。此运算符允许您将这些 onError
通知替换为您选择的 Observable 的发射,而无需取消订阅源 Observable,因此来自源的任何未来的发射项目将被传递给观察者,就好像序列没有被 onError
通知中断一样。
由于 onErrorFlatMap
旨在用于在发出错误后不会终止的病态源 Observable,因此它主要在调试/测试场景中很有用。
请注意,您应该将 onErrorFlatMap
直接应用于病态源 Observable,而不是在它被其他运算符修改后应用于它,因为这些运算符可能会通过在源 Observable 发出错误后立即取消订阅它来有效地重新规范化源 Observable。例如,上面是一个说明如何 onErrorFlatMap
对两个通过 Merge 运算符合并的生成错误的 Observable 做出响应的图示
请注意,onErrorFlatMap
不会对两个 Observable 生成的两个错误做出反应,而只会对由 merge
传递的单个错误做出反应。
onErrorResumeNext
onErrorResumeNext
方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,它不会将该错误传播给观察者,而会改为开始镜像第二个备份 Observable。
onErrorReturn
onErrorReturn
方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,它不会将该错误传播给观察者,而会改为发射指定项目并调用观察者的 onCompleted
方法。
onExceptionResumeNext
与 onErrorResumeNext
方法非常类似,它返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError
,在这种情况下,如果传递给 onError
的 Throwable 是一个 Exception,它不会将该 Exception 传播给观察者,而会改为开始镜像第二个备份 Observable。如果 Throwable 不是一个 Exception,onExceptionResumeNext
返回的 Observable 将将其传播给其观察者的 onError
方法,并且不会调用其备份 Observable。