重试

如果源 Observable 发出错误,则重新订阅它,以期它能够在没有错误的情况下完成。

Retry

The Retry 操作符对来自源 Observable 的 onError 通知做出响应,它不会将该调用传递给它的观察者,而是重新订阅源 Observable 并给它另一个机会在没有错误的情况下完成它的序列。 Retry 始终将 onNext 通知传递给它的观察者,即使来自以错误结束的序列,因此这会导致重复发射(如上图所示)。

另请参阅

特定于语言的信息

RxClojure 未实现 Retry 操作符。

RxCpp 将此操作符实现为 retry

retry

retry 接受一个参数,表示当遇到错误时它应该尝试重新订阅源 Observable 的次数。如果超过此次数,retry 将不会尝试重新订阅,而是将 onError 通知传递给它的观察者。

RxGroovy 有两个版本的此操作符:retryretryWhen

retry

retry 的一个变体不带任何参数。无论它收到多少个 onError 通知,它都会继续重新订阅并镜像源 Observable。

retry 的另一个变体接受一个参数:当遇到错误时,它应该尝试重新订阅源 Observable 的次数。如果超过此次数,retry 将不会再尝试重新订阅,而是将最新的 onError 通知传递给它的观察者。

retry 的第三个变体接受一个谓词函数作为参数。您可以编写此函数以接受两个参数:一个表示到目前为止已执行了多少次重试的整数计数,以及一个表示导致 onError 通知发生的错误的 Throwable。此函数返回一个布尔值,以指示 retry 是否应该重新订阅并镜像源 Observable。如果它没有这样做,那么 retry 将最新的 onError 通知传递给它的观察者。

retry 默认情况下在 trampoline 调度器 上运行。

retryWhen

retryWhen 操作符类似于 retry,但它通过将来自 onError 通知 的 Throwable 传递给生成第二个 Observable 的函数,并观察它的结果来决定是否重新订阅并镜像源 Observable。如果该结果是发射的项目,则 retryWhen 重新订阅并镜像源,该过程重复;如果该结果是 onError 通知,则 retryWhen 将此通知传递给它的观察者并终止。

retryWhen 默认情况下在 trampoline 调度器 上运行,并且还有一个接受调度器作为参数的版本。

RxJava 有两个版本的此操作符:retryretryWhen

retry

retry 的一个变体不带任何参数。无论它收到多少个 onError 通知,它都会继续重新订阅并镜像源 Observable。

retry 的另一个变体接受一个参数:当遇到错误时,它应该尝试重新订阅源 Observable 的次数。如果超过此次数,retry 将不会再尝试重新订阅,而是将最新的 onError 通知传递给它的观察者。

retry 的第三个变体接受一个谓词函数作为参数。您可以编写此函数以接受两个参数:一个表示到目前为止已执行了多少次重试的整数计数,以及一个表示导致 onError 通知发生的错误的 Throwable。此函数返回一个布尔值,以指示 retry 是否应该重新订阅并镜像源 Observable。如果它没有这样做,那么 retry 将最新的 onError 通知传递给它的观察者。

retry 默认情况下在 trampoline 调度器 上运行。

retryWhen

retryWhen 操作符类似于 retry,但它通过将来自 onError 通知 的 Throwable 传递给生成第二个 Observable 的函数,并观察它的结果来决定是否重新订阅并镜像源 Observable。如果该结果是发射的项目,则 retryWhen 重新订阅并镜像源,该过程重复;如果该结果是 onError 通知,则 retryWhen 将此通知传递给它的观察者并终止。

retryWhen 默认情况下在 trampoline 调度器 上运行,并且还有一个接受调度器作为参数的版本。

示例代码

Observable.create((Subscriber<? super String> s) -> {
      System.out.println("subscribing");
      s.onError(new RuntimeException("always fails"));
  }).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
          System.out.println("delay retry by " + i + " second(s)");
          return Observable.timer(i, TimeUnit.SECONDS);
      });
  }).toBlocking().forEach(System.out::println);
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing

RxJS 将此操作符实现为 retry

retry

retry 的一个变体不带任何参数。无论它收到多少个 onError 通知,它都会继续重新订阅并镜像源 Observable。

retry 的另一个变体接受一个参数:它应该愿意接受的 onError 通知次数,然后它也会失败并将 onError 传递给它的观察者。例如,retry(2) 表示 retry 将在它第一次收到 onError 通知时重新订阅并镜像源 Observable,但它将在第二次发生时以错误终止。

retry 在以下发行版中找到

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxKotlin 有两个版本的此操作符:retryretryWhen

retry

retry 的一个变体不带任何参数。无论它收到多少个 onError 通知,它都会继续重新订阅并镜像源 Observable。

retry 的另一个变体接受一个参数:当遇到错误时,它应该尝试重新订阅源 Observable 的次数。如果超过此次数,retry 将不会再尝试重新订阅,而是将最新的 onError 通知传递给它的观察者。

retry 的第三个变体接受一个谓词函数作为参数。您可以编写此函数以接受两个参数:一个表示到目前为止已执行了多少次重试的整数计数,以及一个表示导致 onError 通知发生的错误的 Throwable。此函数返回一个布尔值,以指示 retry 是否应该重新订阅并镜像源 Observable。如果它没有这样做,那么 retry 将最新的 onError 通知传递给它的观察者。

retryWhen

retryWhen 操作符类似于 retry,但它通过将来自 onError 通知 的 Throwable 传递给生成第二个 Observable 的函数,并观察它的结果来决定是否重新订阅并镜像源 Observable。如果该结果是发射的项目,则 retryWhen 重新订阅并镜像源,该过程重复;如果该结果是 onError 通知,则 retryWhen 将此通知传递给它的观察者并终止。

Rx.NET 将此操作符实现为 Retry

Retry

Retry 的一个变体不带任何参数。无论它收到多少个 onError 通知,它都会继续重新订阅并镜像源 Observable。

Retry 的另一个变体接受一个参数:它应该愿意接受的 onError 通知次数,然后它也会失败并将 onError 传递给它的观察者。例如,Retry(2) 表示 Retry 将在它第一次收到 onError 通知时重新订阅并镜像源 Observable,但它将在第二次发生时以错误终止。

RxPHP 将此操作符实现为 retry

重复源可观察序列指定的次数,直到它成功终止。如果未指定重试计数,则会无限重试。请注意,如果您遇到错误并希望它重试一次,那么您必须使用 ->retry(2)。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/retry/retry.php

$count = 0;

$observable = Rx\Observable::interval(1000)
    ->flatMap(function ($x) use (&$count) {
        if (++$count < 2) {
            return Rx\Observable::error(new \Exception('Something'));
        }
        return Rx\Observable::of(42);
    })
    ->retry(3)
    ->take(1);

$observable->subscribe($stdoutObserver);


   
Next value: 42
Complete!
    

RxPHP 还有一个操作符 retryWhen

当通知程序发出下一个值时,在错误情况下重复源可观察序列。如果源可观察错误并且通知程序完成,它将完成源序列。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/retry/retryWhen.php

$source = Rx\Observable::interval(1000)
    ->map(function ($n) {
        if ($n === 2) {
            throw new Exception();
        }
        return $n;
    })
    ->retryWhen(function (\Rx\Observable $errors) {
        return $errors->delay(200);
    })
    ->take(6);

$subscription = $source->subscribe($createStdoutObserver());

   
Next value: 0
Next value: 1
Next value: 0
Next value: 1
Next value: 0
Next value: 1
Complete!
    

RxPY 将此操作符实现为 retry

retry

retry 接受一个可选参数,表示当遇到错误时它应该尝试重新订阅并镜像源 Observable 的次数。如果超过此次数,retry 将不会尝试重新订阅,而是将 onError 通知传递给它的观察者。如果您省略此参数,retry 将尝试无限期地重新订阅和镜像,无论它收到多少个 onError 通知。

Rx.rb 有两个版本的此操作符:retryretry_infinitely

retry

retry 接受一个可选参数,表示当遇到错误时它应该尝试重新订阅并镜像源 Observable 的次数。如果超过此次数,retry 将不会尝试重新订阅,而是将 onError 通知传递给它的观察者。

另一方面,retryInfinitely 将尝试无限期地重新订阅并镜像源 Observable,无论它收到多少个 onError 通知。

RxScala 有两个版本的此操作符:retryretryWhen

retry

retry 的一个变体不带任何参数。无论它收到多少个 onError 通知,它都会继续重新订阅并镜像源 Observable。

retry 的另一个变体接受一个参数:当遇到错误时,它应该尝试重新订阅源 Observable 的次数。如果超过此次数,retry 将不会再尝试重新订阅,而是将最新的 onError 通知传递给它的观察者。

retry 的第三个变体接受一个谓词函数作为参数。您可以编写此函数以接受两个参数:一个表示到目前为止已执行了多少次重试的 Int 计数,以及一个表示导致 onError 通知发生的错误的 Throwable。此函数返回一个布尔值,以指示 retry 是否应该重新订阅并镜像源 Observable。如果它没有这样做,那么 retry 将最新的 onError 通知传递给它的观察者。

retryWhen

retryWhen 操作符类似于 retry,但它通过将来自 onError 通知 的 Throwable 传递给生成第二个 Observable 的函数,并观察它的结果来决定是否重新订阅并镜像源 Observable。如果该结果是发射的项目,则 retryWhen 重新订阅并镜像源,该过程重复;如果该结果是 onError 通知,则 retryWhen 将此通知传递给它的观察者并终止。