Zip 方法返回一个 Observable,该 Observable 将您选择的函数应用于两个(或更多)其他 Observables 按顺序发射的项目组合,该函数的结果将成为返回的 Observable 发射的项目。它严格按顺序应用此函数,因此新 Observable 发射的第一个项目将是将函数应用于 Observable #1 发射的第一个项目和 Observable #2 发射的第一个项目的结果;新 zip-Observable 发射的第二个项目将是将函数应用于 Observable #1 发射的第二个项目和 Observable #2 发射的第二个项目的结果;等等。它只会发射与发射最少项目的源 Observable 发射的项目数量一样多的项目。
Zip
zip
map mapCat
待定
zip zipWith
RxGroovy 将此操作符实现为 zip 的几个变体,以及 zipWith,它是该操作符的实例函数版本。
zipWith
zip 的最后一个参数是一个函数,它接受每个被压缩的 Observables 的一个项目,并发射一个要由从 zip 返回的 Observable 发射的项目。您可以将要压缩在一起的 Observables 提供给 zip,无论是作为两个到九个单独的参数,还是作为单个参数:一个 Observables 的 Iterable 或一个发射 Observables 的 Observable(如上图所示)。
odds = Observable.from([1, 3, 5, 7, 9]); evens = Observable.from([2, 4, 6]); Observable.zip(odds, evens, {o, e -> [o, e]}).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[1, 2] [3, 4] [5, 6] Sequence complete
请注意,在此示例中,生成的 Observable 在发射三个项目后正常完成,这是两个组件 Observbles(evens,它发射三个项目)中较短的一个发射的项目数量。
evens
zip(Iterable<Observable>,FuncN)
zip(Observable<Observable>,FuncN)
zip(Observable,Observable,Func2)
此操作符的 zipWith 实例版本总是接受两个参数。第一个参数可以是一个简单的 Observable,也可以是一个可迭代对象(如上图所示)。
zipWith(Observable,Func2)
zipWith(Iterable,Func2)
zip 和 zipWith 默认情况下不会在任何特定的 调度器 上运行。
RxJava 将此操作符实现为 zip 的几个变体,以及 zipWith,它是该操作符的实例函数版本。
zip zipArray zipIterable zipWith
forkJoin zip zipArray
RxJS 将此操作符实现为 zip 和 zipArray。
zipArray
zip 接受可变数量的 Observables 或 Promises 作为参数,后面跟着一个函数,该函数接受从这些 Observables 发射的每个项目或从这些 Promises 解析的每个项目作为输入,并生成一个要由生成的 Observable 发射的单个项目。
/* Using arguments */ var range = Rx.Observable.range(0, 5); var source = Observable.zip( range, range.skip(1), range.skip(2), function (s1, s2, s3) { return s1 + ':' + s2 + ':' + s3; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0:1:2 Next: 1:2:3 Next: 2:3:4 Completed
/* Using promises and Observables */ var range = Rx.Observable.range(0, 5); var source = Observable.zip( RSVP.Promise.resolve(0), RSVP.Promise.resolve(1), Rx.Observable.return(2) function (s1, s2, s3) { return s1 + ':' + s2 + ':' + s3; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0:1:2 Completed
zipArray 接受可变数量的 Observables 作为参数,并返回一个发射数组的 Observable,每个数组都包含来自每个源 Observable 的第 nth 个项目。
var range = Rx.Observable.range(0, 5); var source = Rx.Observable.zipArray( range, range.skip(1), range.skip(2) ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,1,2] Next: [1,2,3] Next: [2,3,4] Completed
RxJS 还实现了一个类似的操作符 forkJoin。此操作符有两个变体。第一个将每个源 Observables 发射的最后一个元素收集到一个数组中,并将此数组作为其自己的唯一发射的项目发射。您可以将 Observables 列表作为单独的参数或作为 Observables 数组传递给 forkJoin。
forkJoin
var source = Rx.Observable.forkJoin( Rx.Observable.return(42), Rx.Observable.range(0, 10), Rx.Observable.fromArray([1,2,3]), RSVP.Promise.resolve(56) ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [42, 9, 3, 56] Completed
forkJoin 的第二个变体存在于原型函数中,您可以将其调用到一个源 Observable 的实例上,将另一个源 Observable 作为参数传递给它。作为第二个参数,您将一个函数传递给它,该函数将两个源 Observables 发射的最终项目组合成要由生成的 Observable 发射的唯一项目。
var source1 = Rx.Observable.return(42); var source2 = Rx.Observable.range(0, 3); var source = source1.forkJoin(source2, function (s1, s2) { return s1 + s2; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 44 Completed
forkJoin 存在于以下发行版中
rx.all.js
rx.all.compat.js
rx.experimental.js
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
zip forkJoin
RxPHP 将此操作符实现为 zip。
使用选择器函数将指定的 Observable 序列合并为一个 Observable 序列,只要所有 Observable 序列在相应的索引处都产生了一个元素。如果省略结果选择器函数,则将生成包含相应索引处 Observable 序列元素的列表。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip.php //Without a result selector $range = \Rx\Observable::fromArray(range(0, 4)); $source = $range ->zip([ $range->skip(1), $range->skip(2) ]); $observer = $createStdoutObserver(); $subscription = $source ->subscribe(new CallbackObserver( function ($array) use ($observer) { $observer->onNext(json_encode($array)); }, [$observer, 'onError'], [$observer, 'onCompleted'] ));
Next value: [0,1,2] Next value: [1,2,3] Next value: [2,3,4] Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip-result-selector.php //With a result selector $range = \Rx\Observable::fromArray(range(0, 4)); $source = $range ->zip([ $range->skip(1), $range->skip(2) ], function ($s1, $s2, $s3) { return $s1 . ':' . $s2 . ':' . $s3; }); $observer = $createStdoutObserver(); $subscription = $source->subscribe($createStdoutObserver());
Next value: 0:1:2 Next value: 1:2:3 Next value: 2:3:4 Complete!
RxPHP 还拥有一个 forkJoin 操作符。
并行运行所有 Observable 序列,并收集它们的最后一个元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/forkJoin/forkJoin.php use Rx\Observable; $obs1 = Observable::range(1, 4); $obs2 = Observable::range(3, 5); $obs3 = Observable::fromArray(['a', 'b', 'c']); $observable = Observable::forkJoin([$obs1, $obs2, $obs3], function($v1, $v2, $v3) { return $v1 . $v2 . $v3; }); $observable->subscribe($stdoutObserver);
Next value: 47c Complete!
zip zip_array
zip zipWith zipWithIndex