合并

通过合并多个可观察对象的发出事件,将它们组合成一个可观察对象

您可以使用 Merge 运算符将多个可观察对象的输出组合在一起,使它们的行为像单个可观察对象一样。

Merge 可能会交织合并的可观察对象发出的项(类似的运算符 Concat 不会交织项,而是在开始发出下一个源可观察对象的项之前依次发出每个源可观察对象的所有项)。

如上图所示,来自任何源可观察对象的 onError 通知将立即传递给观察者,并将终止合并的可观察对象。

MergeDelayError

在许多 ReactiveX 实现中,有一个名为 MergeDelayError 的第二个运算符,它改变了这种行为——保留 onError 通知,直到所有合并的可观察对象都完成,然后才将其传递给观察者

另请参见

特定于语言的信息

在 RxClojure 中,这里有六个需要关注的运算符

merge

merge 将两个或多个可观察对象转换为单个可观察对象,该可观察对象发出所有这些可观察对象发出的所有项。

merge*

merge* 将发出可观察对象的可观察对象转换为单个可观察对象,该可观察对象发出所有发出的可观察对象发出的所有项。

merge-delay-error

merge-delay-errormerge 相似,但即使一个或多个这些可观察对象在发出事件仍在挂起时以 onError 通知终止,它也会发出所有合并的可观察对象的所有项。

merge-delay-error*merge* 的一个类似修改版本。

interleave*

interleavemerge 相似,但它对如何交织来自源可观察对象的项更加刻意:结果可观察对象发出第一个源可观察对象发出的第一个项,然后是第二个源可观察对象发出的第一个项,以此类推,并在到达最后一个源可观察对象后,发出第一个源可观察对象发出的第二个项,第二个源可观察对象发出的第二个项,以此类推,直到所有源可观察对象都终止。

interleave* 类似,但它作用于可观察对象的集合。

RxCpp 将此运算符实现为 merge

merge

RxGroovy 将此运算符实现为 mergemergeWithmergeDelayError

merge

例如,以下代码将 oddsevens 合并到一个可观察对象中。(subscribeOn 运算符使 odds 在与 evens 不同的线程上运行,以便这两个可观察对象可以同时发出项,以演示 Merge 如何交织这些项。)

示例代码

odds  = Observable.from([1, 3, 5, 7]).subscribeOn(someScheduler);
evens = Observable.from([2, 4, 6]);

Observable.merge(odds,evens).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
3
2
5
4
7
6
Sequence complete

您还可以将 List<>(或其他可迭代对象)中的可观察对象、可观察对象的数组,甚至发出可观察对象的集合传递给 merge,而不是将多个可观察对象(最多九个)传递给 mergemerge 将它们的输出合并到单个可观察对象的输出中。

merge(List)

如果您传入可观察对象的集合,您可以选择还传入一个值,该值指示 merge 它应该尝试同时订阅的这些可观察对象的最大数量。一旦它达到这个最大订阅计数,它将不再订阅源可观察对象发出的任何其他可观察对象,直到其中一个已订阅的可观察对象发出 onCompleted 通知为止。

merge 的实例版本是 mergeWith,因此,例如,在上面的代码示例中,您可以编写 odds.mergeWith(evens),而不是编写 Observable.merge(odds,evens)

如果传递给 merge 的任何单个可观察对象以 onError 通知终止,则由 merge 生成的可观察对象本身将立即以 onError 通知终止。如果您希望在报告错误之前继续发出剩余的无错误可观察对象的结果,请改用 mergeDelayError

mergeDelayError

mergeDelayError 的行为非常类似于 merge。区别在于,当被合并的可观察对象之一以 onError 通知终止时。如果在 merge 中发生这种情况,合并的可观察对象将立即发出 onError 通知并终止。另一方面,mergeDelayError 将延迟报告错误,直到它让任何其他非错误生成的可观察对象有机会完成发出它们的项,它将自己发出这些项,并且只有在所有其他合并的可观察对象都完成后,才会以 onError 通知终止。

由于可能不止一个合并的可观察对象遇到错误,mergeDelayError 可能会在 onError 通知中传递关于多个错误的信息(它永远不会调用观察者的 onError 方法超过一次)。因此,如果您想知道这些错误的性质,您应该编写观察者的 onError 方法,使其接受 CompositeException 类的参数。

mergeDelayError 的变体更少。您不能将可观察对象的 Iterable 或数组传递给它,但您可以将发出可观察对象的集合或一个到九个单独的可观察对象作为参数传递给它。mergeDelayError 没有像 merge 那样有实例方法版本。

RxJava 将此运算符实现为 mergemergeWithmergeDelayError

merge

示例代码

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

您还可以将 List<>(或其他可迭代对象)中的可观察对象、可观察对象的数组,甚至发出可观察对象的集合传递给 merge,而不是将多个可观察对象(最多九个)传递给 mergemerge 将它们的输出合并到单个可观察对象的输出中。

merge(List)

如果您传入可观察对象的集合,您可以选择还传入一个值,该值指示 merge 它应该尝试同时订阅的这些可观察对象的最大数量。一旦它达到这个最大订阅计数,它将不再订阅源可观察对象发出的任何其他可观察对象,直到其中一个已订阅的可观察对象发出 onCompleted 通知为止。

merge 的实例版本是 mergeWith,因此,例如,您可以编写 odds.mergeWith(evens),而不是编写 Observable.merge(odds,evens)

如果传递给 merge 的任何单个可观察对象以 onError 通知终止,则由 merge 生成的可观察对象本身将立即以 onError 通知终止。如果您希望在报告错误之前继续发出剩余的无错误可观察对象的结果,请改用 mergeDelayError

mergeDelayError

mergeDelayError 的行为非常类似于 merge。区别在于,当被合并的可观察对象之一以 onError 通知终止时。如果在 merge 中发生这种情况,合并的可观察对象将立即发出 onError 通知并终止。另一方面,mergeDelayError 将延迟报告错误,直到它让任何其他非错误生成的可观察对象有机会完成发出它们的项,它将自己发出这些项,并且只有在所有其他合并的可观察对象都完成后,才会以 onError 通知终止。

由于可能不止一个合并的可观察对象遇到错误,mergeDelayError 可能会在 onError 通知中传递关于多个错误的信息(它永远不会调用观察者的 onError 方法超过一次)。因此,如果您想知道这些错误的性质,您应该编写观察者的 onError 方法,使其接受 CompositeException 类的参数。

mergeDelayError 的变体更少。您不能将可观察对象的 Iterable 或数组传递给它,但您可以将发出可观察对象的集合或一个到九个单独的可观察对象作为参数传递给它。mergeDelayError 没有像 merge 那样有实例方法版本。

merge

merge 的第一个变体是一个实例运算符,它将可变数量的可观察对象作为参数,将这些可观察对象的每一个与源(实例)可观察对象合并,以生成其单个输出可观察对象。

merge 的第一个变体位于以下发行版中

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

merge 的第二个变体是一个原型(类)运算符,它接受两个参数。第二个参数是发出您要合并的可观察对象的集合。第一个参数是一个数字,指示您希望 merge 在任何时刻尝试订阅的这些发出的可观察对象的最大数量。一旦它达到这个最大订阅计数,它将不再订阅源可观察对象发出的任何其他可观察对象,直到其中一个已订阅的可观察对象发出 onCompleted 通知为止。

merge 的第二个变体位于以下发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
mergeAll

mergeAllmerge 的第二个变体类似,只是它不允许您设置这个最大订阅计数。它只接受发出可观察对象的集合的单个参数。

mergeAll 位于以下发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
mergeDelayError

如果传递给 mergemergeAll 的任何单个可观察对象以 onError 通知终止,则结果可观察对象将立即以 onError 通知终止。如果您希望在报告错误之前继续发出剩余的无错误可观察对象的结果,请改用 mergeDelayError

示例代码

var source1 = Rx.Observable.of(1,2,3);
var source2 = Rx.Observable.throwError(new Error('whoops!'));
var source3 = Rx.Observable.of(4,5,6);

var merged = Rx.Observable.mergeDelayError(source1, source2, source3);

var subscription = merged.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); }
  function () { console.log('Completed' } );
1
2
3
4
5
6
Error: Error: whoops!

mergeDelayError 位于以下发行版中

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxKotlin 将此运算符实现为 mergemergeWithmergeDelayError

merge

您还可以将 List<>(或其他可迭代对象)中的可观察对象、可观察对象的数组,甚至发出可观察对象的集合传递给 merge,而不是将多个可观察对象(最多九个)传递给 mergemerge 将它们的输出合并到单个可观察对象的输出中。

merge(List)

如果您传入可观察对象的集合,您可以选择还传入一个值,该值指示 merge 它应该尝试同时订阅的这些可观察对象的最大数量。一旦它达到这个最大订阅计数,它将不再订阅源可观察对象发出的任何其他可观察对象,直到其中一个已订阅的可观察对象发出 onCompleted 通知为止。

merge 的实例版本是 mergeWith,因此,例如,您可以编写 odds.mergeWith(evens),而不是编写 Observable.merge(odds,evens)

如果传递给 merge 的任何单个可观察对象以 onError 通知终止,则由 merge 生成的可观察对象本身将立即以 onError 通知终止。如果您希望在报告错误之前继续发出剩余的无错误可观察对象的结果,请改用 mergeDelayError

mergeDelayError

mergeDelayError 的行为非常类似于 merge。区别在于,当被合并的可观察对象之一以 onError 通知终止时。如果在 merge 中发生这种情况,合并的可观察对象将立即发出 onError 通知并终止。另一方面,mergeDelayError 将延迟报告错误,直到它让任何其他非错误生成的可观察对象有机会完成发出它们的项,它将自己发出这些项,并且只有在所有其他合并的可观察对象都完成后,才会以 onError 通知终止。

由于可能不止一个合并的可观察对象遇到错误,mergeDelayError 可能会在 onError 通知中传递关于多个错误的信息(它永远不会调用观察者的 onError 方法超过一次)。因此,如果您想知道这些错误的性质,您应该编写观察者的 onError 方法,使其接受 CompositeException 类的参数。

mergeDelayError 的变体更少。您不能将可观察对象的 Iterable 或数组传递给它,但您可以将发出可观察对象的集合或一个到九个单独的可观察对象作为参数传递给它。mergeDelayError 没有像 merge 那样有实例方法版本。

Rx.NET 将此运算符实现为 Merge

Merge

您可以将 Merge 传递给可观察对象的数组、可观察对象的 Enumerable、可观察对象的集合或两个单独的可观察对象。

如果您传递可观察对象的 Enumerable 或集合,您可以选择还传入一个整数,指示它应该尝试同时订阅的这些可观察对象的最大数量。一旦它达到这个最大订阅计数,它将不再订阅源可观察对象发出的任何其他可观察对象,直到其中一个已订阅的可观察对象发出 onCompleted 通知为止。

RxPHP 将此运算符实现为 merge

通过将它们的发出的事件合并到一个可观察对象中,将一个可观察对象与另一个可观察对象组合在一起。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/merge/merge.php

$observable       = Rx\Observable::of(42)->repeat();
$otherObservable  = Rx\Observable::of(21)->repeat();
$mergedObservable = $observable
    ->merge($otherObservable)
    ->take(10);

$disposable = $mergedObservable->subscribe($stdoutObserver);

   
Next value: 42
Next value: 21
Next value: 42
Next value: 21
Next value: 42
Next value: 21
Next value: 42
Next value: 21
Next value: 42
Next value: 21
Complete!
    

RxPHP 还具有运算符 mergeAll

将可观察对象序列合并到一个可观察对象序列中。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/merge/merge-all.php

$sources = Rx\Observable::range(0, 3)
    ->map(function ($x) {
        return Rx\Observable::range($x, 3);
    });

$merged = $sources->mergeAll();

$disposable = $merged->subscribe($stdoutObserver);

   
Next value: 0
Next value: 1
Next value: 1
Next value: 2
Next value: 2
Next value: 2
Next value: 3
Next value: 3
Next value: 4
Complete!
    

RxPY 将此运算符实现为 mergemerge_all/merge_observable

merge

您可以将 merge 传递给一组可观察对象作为单独的参数,或者作为包含这些可观察对象数组的单个参数。

merge_all

merge_all 及其别名 merge_observable 以发出可观察对象的集合作为其单个参数。它们合并所有这些可观察对象的发出的事件以创建它们自己的可观察对象。

Rx.rb 将此运算符实现为 mergemerge_concurrentmerge_all

merge

merge 将第二个可观察对象合并到它正在操作的可观察对象中,以创建一个新的合并后的可观察对象。

merge_concurrent 作用于发出可观察对象的集合,将来自这些可观察对象中每一个的发出的事件合并到它自己的发出事件中。您可以选择将一个整数参数传递给它,指示 merge_concurrent 应该尝试同时订阅这些发出的可观察对象的个数。一旦它达到这个最大订阅计数,它将不再订阅源可观察对象发出的任何其他可观察对象,直到其中一个已订阅的可观察对象发出 onCompleted 通知为止。默认值为 1,这使其等效于 merge_all

merge_all

merge_allmerge_concurrent(1) 相似。它一次订阅每个发射的 Observable,将其发射的值作为自己的值镜像,并在当前 Observable 以 onCompleted 通知终止之前,等待订阅下一个 Observable。在这方面,它更像是一个 Concat 变体。

RxScala 实现这个操作符为 flattenflattenDelayErrormergemergeDelayError

merge

merge 接受第二个 Observable 作为参数,并将该 Observable 与应用 merge 操作符的 Observable 合并,以创建一个新的输出 Observable。

mergeDelayError

mergeDelayErrormerge 相似,但它总是发射两个 Observable 中的所有项目,即使其中一个 Observable 在另一个 Observable 完成发射项目之前以 onError 通知终止。

flatten

flatten 接受一个 Observable 作为参数,该 Observable 发射 Observable。它合并每个发射的 Observable 发射的项目,以创建自己的单个 Observable 序列。此操作符的变体允许您传入一个 Int,指示您希望 flatten 在任何时候尝试订阅的这些发射的 Observable 的最大数量。如果达到此最大订阅计数,它将停止订阅源 Observable 发射的任何其他 Observable,直到已订阅的 Observable 之一发出 onCompleted 通知。

flattenDelayErrorflatten 相似,但它总是发射所有发射的 Observable 中的所有项目,即使这些 Observable 中的一个或多个在其他 Observable 完成发射项目之前以 onError 通知终止。

RxSwift 实现这个操作符为 merge

merge

merge 接受一个 Observable 作为参数,该 Observable 发射 Observable。它合并每个发射的 Observable 发射的项目,以创建自己的单个 Observable 序列。

此操作符的变体 merge(maxConcurrent:) 允许您传入一个 Int,指示您希望 merge 在任何时候尝试订阅的这些发射的 Observable 的最大数量。如果达到此最大订阅计数,它将停止订阅源 Observable 发射的任何其他 Observable,直到已订阅的 Observable 之一发出 onCompleted 通知。

示例代码

let subject1 = PublishSubject()
let subject2 = PublishSubject()

Observable.of(subject1, subject2)
   .merge()
   .subscribe {
       print($0)
   }

subject1.on(.Next(10))
subject1.on(.Next(11))
subject1.on(.Next(12))
subject2.on(.Next(20))
subject2.on(.Next(21))
subject1.on(.Next(14))
subject1.on(.Completed)
subject2.on(.Next(22))
subject2.on(.Completed)
Next(10)
Next(11)
Next(12)
Next(20)
Next(21)
Next(14)
Next(22)
Completed