Subscribe

对 Observable 发出的数据和通知进行操作

The Subscribe 运算符是连接观察者和 Observable 的粘合剂。为了让观察者能够看到 Observable 发出的数据,或者接收 Observable 的错误或完成通知,它必须首先使用此运算符订阅该 Observable。

The Subscribe 运算符的典型实现可能接受一到三个方法(这些方法构成观察者),或者它可能接受一个对象(有时称为 ObserverSubscriber),该对象实现了包含这三个方法的接口

onNext
每当 Observable 发出数据时,Observable 都会调用此方法。此方法将 Observable 发出的数据作为参数。
onError
Observable 调用此方法以指示它无法生成预期数据或遇到其他错误。这将停止 Observable,它不会再调用 onNextonCompleted。The onError 方法将导致错误的原因作为其参数(有时是一个对象,例如 Exception 或 Throwable,有时是一个简单的字符串,具体取决于实现)。
onCompleted
如果 Observable 没有遇到任何错误,它将在最后一次调用 onNext 之后调用此方法。

如果 Observable 在观察者订阅它之前不会开始发出数据,则称为“冷” Observable;如果 Observable 可以在任何时候开始发出数据,并且订阅者可以在其开始发出数据之后开始观察发射数据的序列,并且错过之前发射的所有数据,则称为“热” Observable。

另请参见

特定语言信息

RxGroovy 实现了几种 subscribe 的变体。

如果传递给它的是没有参数,它将触发对底层 Observable 的订阅,但将忽略其数据和通知。这将激活冷 Observable。

也可以传递一到三个函数;这些函数将被解释为以下内容:

  1. onNext
  2. onNextonError
  3. onNextonErroronCompleted

最后,可以传递一个实现了 ObserverSubscriber 接口的对象。The Observer 接口包含三个前面描述的“on”方法。The Subscriber 接口也实现了这些方法,并添加了一些其他方法来促进响应式拉式背压,并允许 Subscriber 在 Observable 完成之前取消订阅 Observable。

subscribe 的调用返回一个实现了 Subscription 接口的对象。此接口包括 unsubscribe 方法,可以在任何时候调用它以断开 subscribe 在 Observable 和观察者(或代表观察者的方法)之间建立的订阅。

The forEach 运算符是 subscribe 的更简单版本。可以向它们传递一到三个函数,这些函数将被解释为以下内容

  1. onNext
  2. onNextonError
  3. onNextonErroronCompleted

subscribe 不同,forEach 不会返回一个对象,使用它可以取消订阅。也没有选择传递一个具有此功能的参数。因此,只有在绝对需要对 Observable 的所有数据和通知进行操作时才应使用此运算符。

forEach

还有一种名为 forEachBlockingObservable 方法,它与之有些相似。为了使用它,必须首先通过 BlockingObservable.from 方法或 Observable.toBlocking 运算符将源 Observable 转换为 BlockingObservable

BlockingObservable.forEach 将一个函数作为其参数,此函数的行为非常类似于对普通 Observable 的订阅中的 onNext 函数。The forEach 运算符本身会阻塞,直到 BlockingObservable 完成,它通过取消阻塞来指示它已完成,而不是通过调用回调函数。如果遇到错误,它将抛出一个 RuntimeException(而不是调用类似于 onError 回调的函数)。

RxJava 实现了几种 subscribe 的变体。

如果传递给它的是没有参数,它将触发对底层 Observable 的订阅,但将忽略其数据和通知。这将激活冷 Observable。

也可以传递一到三个函数;这些函数将被解释为以下内容:

  1. onNext
  2. onNextonError
  3. onNextonErroronCompleted

最后,可以传递一个实现了 ObserverSubscriber 接口的对象。The Observer 接口包含三个前面描述的“on”方法。The Subscriber 接口也实现了这些方法,并添加了一些其他方法来促进响应式拉式背压,并允许 Subscriber 在 Observable 完成之前取消订阅 Observable。

subscribe 的调用返回一个实现了 Subscription 接口的对象。此接口包括 unsubscribe 方法,可以在任何时候调用它以断开 subscribe 在 Observable 和观察者(或代表观察者的方法)之间建立的订阅。

The forEach 运算符是 subscribe 的更简单版本。可以向它们传递一到三个函数,这些函数将被解释为以下内容

  1. onNext
  2. onNextonError
  3. onNextonErroronCompleted

subscribe 不同,forEach 不会返回一个对象,使用它可以取消订阅。也没有选择传递一个具有此功能的参数。因此,只有在绝对需要对 Observable 的所有数据和通知进行操作时才应使用此运算符。

forEach

还有一种名为 forEachBlockingObservable 方法,它与之有些相似。为了使用它,必须首先通过 BlockingObservable.from 方法或 Observable.toBlocking 运算符将源 Observable 转换为 BlockingObservable

BlockingObservable.forEach 将一个函数作为其参数,此函数的行为非常类似于对普通 Observable 的订阅中的 onNext 函数。The forEach 运算符本身会阻塞,直到 BlockingObservable 完成,它通过取消阻塞来指示它已完成,而不是通过调用回调函数。如果遇到错误,它将抛出一个 RuntimeException(而不是调用类似于 onError 回调的函数)。

在 RxJS 中,可以使用两种方式订阅 Observable

  1. 分别使用 subscribeOnNextsubscribeOnCompletedsubscribeOnError 将单个函数订阅到 Observable 的 onNextonCompletedonError 通知。
  2. 通过将零到三个单独的函数或一个实现了这三个函数的对象传递到 subscribeforEach 运算符中来订阅(这些运算符的行为相同)。

示例代码

var source = Rx.Observable.range(0, 3)

var subscription = source.subscribeOnNext(
  function (x) {
    console.log('Next: %s', x);
  });
Next: 0
Next: 1
Next: 2
var source = Rx.Observable.range(0, 3);

var subscription = source.subscribeOnCompleted(
  function () {
    console.log('Completed');
  });
Completed
var source = Rx.Observable.throw(new Error());

var subscription = source.subscribeOnError(
  function (err) {
    console.log('Error: %s', err);
  });
Error: Error
var observer = Rx.Observer.create(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });

var source = Rx.Observable.range(0, 3)

var subscription = source.subscribe(observer);
Next: 0
Next: 1
Next: 2
Completed
var source = Rx.Observable.range(0, 3)

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed

本节中描述的函数都可以在以下发行版中找到

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP 将此运算符实现为 subscribe