Catch

通过继续序列而无错误来从 onError 通知中恢复

Catch

Catch 运算符会拦截来自源 Observable 的 onError 通知,并不会将其传递给任何观察者,而是将其替换为其他项目或项目序列,从而可能允许生成的 Observable 正常终止或根本不终止。

Catch 运算符有几种变体,并且不同的 ReactiveX 实现使用各种名称来描述此操作,如以下各节所示。

在一些 ReactiveX 实现中,有一个运算符称为“OnErrorResumeNext”,其行为类似于 Catch 变体:具体来说,是对来自源 Observable 的 onError 通知做出反应。在其他实现中,有一个运算符具有相同的名称,但其行为更类似于 Concat 变体:无论源 Observable 是正常终止还是以错误终止,它都执行串联操作。这很不幸且令人困惑,但我们不得不忍受。

另请参阅

特定语言的信息

catch*

RxClojure 将此运算符实现为 catch*。此运算符接受两个参数,它们都是您选择的函数,这些函数将由 onError 引发的异常作为其单个参数。第一个函数是一个谓词。如果它返回 falsecatch* 会将 onError 通知原封不动地传递给其观察者。但是,如果它返回 truecatch* 会吞掉错误,调用第二个函数(它返回一个 Observable),并将来自此新 Observable 的发射和通知传递给其观察者。

您可以用表示各种异常的类对象替换第一个函数参数(评估异常的谓词)。如果您这样做,catch* 将将其视为等效于执行 instance? 检查以查看来自 onError 通知的异常是否是类对象的实例的谓词。换句话说

示例代码

(->> my-observable
  (catch* IllegalArgumentException
          (fn [e] (rx/return 1)))
)

等效于

(->> my-observable
  (catch* (fn [e] (-> instance? IllegalArgumentException e))
          (fn [e] (rx/return 1)))
)

RxCpp 未实现 Catch 运算符。

RxGroovy 以与 RxJava 相同的方式实现 Catch 运算符。有三个不同的运算符提供此功能

onErrorReturn
指示 Observable 在遇到错误时发射特定项目,然后正常终止
onErrorResumeNext
指示 Observable 在遇到错误时开始发射第二个 Observable 序列
onExceptionResumeNext
指示 Observable 在遇到异常(但不包括其他类型的可抛出对象)后继续发射项目

onErrorReturn

onErrorReturn

onErrorReturn 方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,它不会将该错误传播给观察者,而会改为发射指定项目并调用观察者的 onCompleted 方法,如下面的示例代码所示

示例代码

def myObservable = Observable.create({ aSubscriber ->
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Four');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Three');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Two');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('One');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onError();
});

myObservable.onErrorReturn({ return('Blastoff!'); }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
Four
Three
Two
One
Blastoff!
Sequence complete

onErrorResumeNext

onErrorResumeNext

onErrorResumeNext 方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,它不会将该错误传播给观察者,而会改为开始镜像第二个备份 Observable,如下面的示例代码所示

示例代码

def myObservable = Observable.create({ aSubscriber ->
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Three');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Two');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('One');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onError();
});
def myFallback = Observable.create({ aSubscriber ->
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('0');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('1');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('2');
  if(false == aSubscriber.isUnsubscribed()) aSubscriber.onCompleted();
});

myObservable.onErrorResumeNext(myFallback).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
Three
Two
One
0
1
2
Sequence complete

onExceptionResumeNext

onExceptionResumeNext

onErrorResumeNext 方法非常类似,它返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,如果传递给 onError 的 Throwable 是一个 Exception,它不会将该 Exception 传播给观察者,而会改为开始镜像第二个备份 Observable。如果 Throwable 不是一个 Exception,onExceptionResumeNext 返回的 Observable 将将其传播给其观察者的 onError 方法,并且不会调用其备份 Observable。

RxJava 使用三个不同的运算符实现 Catch 运算符

onErrorReturn
指示 Observable 在遇到错误时发射特定项目,然后正常终止
onErrorResumeNext
指示 Observable 在遇到错误时开始发射第二个 Observable 序列
onExceptionResumeNext
指示 Observable 在遇到异常(但不包括其他类型的可抛出对象)后继续发射项目

onErrorReturn

onErrorReturn

onErrorReturn 方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,它不会将该错误传播给观察者,而会改为发射指定项目并调用观察者的 onCompleted 方法。

onErrorResumeNext

onErrorResumeNext

onErrorResumeNext 方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,它不会将该错误传播给观察者,而会改为开始镜像第二个备份 Observable。

onExceptionResumeNext

onExceptionResumeNext

onErrorResumeNext 方法非常类似,它返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,如果传递给 onError 的 Throwable 是一个 Exception,它不会将该 Exception 传播给观察者,而会改为开始镜像第二个备份 Observable。如果 Throwable 不是一个 Exception,onExceptionResumeNext 返回的 Observable 将将其传播给其观察者的 onError 方法,并且不会调用其备份 Observable。

RxJS 使用两个不同的运算符实现 Catch 运算符

catch
指示 Observable 在遇到错误时开始发射第二个 Observable 序列
onErrorResumeNext
指示 Observable 在遇到错误或源 Observable 正常终止时开始发射第二个 Observable 序列

catch

catch

catch 存在于以下发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

onErrorResumeNext

onErrorResumeNext

此实现借鉴了 Rx.NET 中令人困惑的命名法,其中 onErrorResumeNext 在错误源 Observable 的正常无错误终止时切换到备份 Observable。

onErrorResumeNext 存在于以下发行版中

  • rx.js
  • rx.compat.js

RxKotlin 以与 RxJava 相同的方式实现 Catch 运算符。有三个不同的运算符提供此功能

onErrorReturn
指示 Observable 在遇到错误时发射特定项目,然后正常终止
onErrorResumeNext
指示 Observable 在遇到错误时开始发射第二个 Observable 序列
onExceptionResumeNext
指示 Observable 在遇到异常(但不包括其他类型的可抛出对象)后继续发射项目

onErrorReturn

onErrorReturn

onErrorReturn 方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,它不会将该错误传播给观察者,而会改为发射指定项目并调用观察者的 onCompleted 方法。

onErrorResumeNext

onErrorResumeNext

onErrorResumeNext 方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,它不会将该错误传播给观察者,而会改为开始镜像第二个备份 Observable。

onExceptionResumeNext

onExceptionResumeNext

onErrorResumeNext 方法非常类似,它返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,如果传递给 onError 的 Throwable 是一个 Exception,它不会将该 Exception 传播给观察者,而会改为开始镜像第二个备份 Observable。如果 Throwable 不是一个 Exception,onExceptionResumeNext 返回的 Observable 将将其传播给其观察者的 onError 方法,并且不会调用其备份 Observable。

Rx.NET 使用两个不同的运算符实现 Catch 运算符

Catch
指示 Observable 在遇到错误时开始发射第二个 Observable 序列
OnErrorResumeNext
指示 Observable 在遇到错误或源 Observable 正常终止时开始发射第二个 Observable 序列

Catch

Catch

Catch 运算符有一个变体,允许您指定要捕获的 Exception 类型。如果您使用该运算符的变体,任何其他 Exception 都将被传递给观察者,就好像没有应用 Catch 运算符一样。

OnErrorResumeNext

OnErrorResumeNext

此实现引入了一种令人困惑的命名法,其中尽管名为 OnErrorResumeNext,它在错误源 Observable 的正常无错误终止时切换到备份 Observable。因此,它更像是一个串联运算符。

RxPHP 将此运算符实现为 catch

继续由异常终止的 observable 序列,使用下一个 observable 序列。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/catch/catch.php

$obs2 = Rx\Observable::of(42);

$source = \Rx\Observable::error(new Exception('Some error'))
    ->catch(function (Throwable $e, \Rx\Observable $sourceObs) use ($obs2) {
        return $obs2;
    });

$subscription = $source->subscribe($stdoutObserver);
   
Next value: 42
Complete!
    

RxPY 使用两个不同的运算符实现 Catch 运算符

catch_exception
指示 Observable,如果遇到错误,则开始发射一组其他 Observable 发射的项目,一次发射一个 Observable,直到其中一个 Observable 成功终止
on_error_resume_next
指示 Observable 串联一组其他 Observable 发射的项目,一次发射一个 Observable,无论源 Observable 或任何后续 Observable 是以错误终止还是正常终止

catch_exception

catch_exception

您可以将一组备份 Observable 传递给 catch_exception,这些 Observable 可以是单个函数参数,也可以是单个 Observable 数组。如果它遇到来自源 Observable 的 onError 通知,它将订阅并开始镜像这些备份 Observable 中的第一个 Observable。如果此备份 Observable 本身发出 onError 通知,catch_exception 将吞掉它并切换到下一个备份 Observable。如果这些 Observable 中的任何一个发出 onCompleted 通知,catch_exception 将将其传递并停止。

on_error_resume_next

on_error_resume_next

您可以将一组备份 Observable 传递给 on_error_resume_next,这些 Observable 可以是单个函数参数,也可以是单个 Observable 数组,或者是一个生成 Observable 的工厂函数。当源 Observable 终止时,无论是以正常方式终止还是以错误方式终止,on_error_resume_next 将订阅并开始镜像这些备份 Observable 中的第一个 Observable,然后递归地继续此串联过程,直到没有更多 Observable 需要镜像,此时它将传递来自这些 Observable 中最后一个 Observable 的 onErroronCompleted 通知。

Rx.rb 使用两个不同的运算符实现 Catch 运算符

rescue_error
指示 Observable 在遇到错误时开始发射来自另一个 Observable 的项目,或者来自由操作返回的 Observable 的项目
on_error_resume_next
指示 Observable 将来自另一个 Observable 的发射的项目串联到源 Observable 发射的序列中,无论源 Observable 是正常终止还是以错误终止

rescue_error

rescue_error

您可以将 Observable 或生成 Observable 的工厂操作传递给 rescue_error

on_error_resume_next

on_error_resume_next

在 Rx.rb 中,on_error_resume_next 继承了来自 Rx.NET 的误导性命名法,因为它无论源序列是正常终止还是以错误终止,都会将第二个 Observable 序列串联到源序列。

Rx.rb 使用四个不同的运算符实现 Catch 运算符

onErrorFlatMap
将来自行为不端 Observable 的所有 onError 通知替换为来自辅助 Observable 的发射
onErrorResumeNext
指示 Observable 在遇到错误时开始发射第二个 Observable 序列
onErrorReturn
指示 Observable 在遇到错误时发射特定项目,然后正常终止
onExceptionResumeNext
指示 Observable 在遇到异常(但不包括其他类型的可抛出对象)后继续发射项目

onErrorFlatMap

onErrorFlatMap

onErrorFlatMap 处理一个特殊情况:一个不符合 Observable 合约 的源 Observable,它可能以这样一种方式将 onError 通知与发射交织在一起,即它可以在不终止的情况下进行交织。此运算符允许您将这些 onError 通知替换为您选择的 Observable 的发射,而无需取消订阅源 Observable,因此来自源的任何未来的发射项目将被传递给观察者,就好像序列没有被 onError 通知中断一样。

由于 onErrorFlatMap 旨在用于在发出错误后不会终止的病态源 Observable,因此它主要在调试/测试场景中很有用。

unintuitive onErrorFlatMap and Merge interaction

请注意,您应该将 onErrorFlatMap 直接应用于病态源 Observable,而不是在它被其他运算符修改后应用于它,因为这些运算符可能会通过在源 Observable 发出错误后立即取消订阅它来有效地重新规范化源 Observable。例如,上面是一个说明如何 onErrorFlatMap 对两个通过 Merge 运算符合并的生成错误的 Observable 做出响应的图示

请注意,onErrorFlatMap 不会对两个 Observable 生成的两个错误做出反应,而只会对由 merge 传递的单个错误做出反应。

onErrorResumeNext

onErrorResumeNext

onErrorResumeNext 方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,它不会将该错误传播给观察者,而会改为开始镜像第二个备份 Observable。

onErrorReturn

onErrorReturn

onErrorReturn 方法返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,它不会将该错误传播给观察者,而会改为发射指定项目并调用观察者的 onCompleted 方法。

onExceptionResumeNext

onExceptionResumeNext

onErrorResumeNext 方法非常类似,它返回一个 Observable,它反映了源 Observable 的行为,除非该 Observable 调用 onError,在这种情况下,如果传递给 onError 的 Throwable 是一个 Exception,它不会将该 Exception 传播给观察者,而会改为开始镜像第二个备份 Observable。如果 Throwable 不是一个 Exception,onExceptionResumeNext 返回的 Observable 将将其传播给其观察者的 onError 方法,并且不会调用其备份 Observable。