Zip

通过指定的函数将多个 Observables 的发射值组合在一起,并根据该函数的结果为每个组合发射单个项目。

Zip 方法返回一个 Observable,该 Observable 将您选择的函数应用于两个(或更多)其他 Observables 按顺序发射的项目组合,该函数的结果将成为返回的 Observable 发射的项目。它严格按顺序应用此函数,因此新 Observable 发射的第一个项目将是将函数应用于 Observable #1 发射的第一个项目和 Observable #2 发射的第一个项目的结果;新 zip-Observable 发射的第二个项目将是将函数应用于 Observable #1 发射的第二个项目和 Observable #2 发射的第二个项目的结果;等等。它只会发射与发射最少项目的源 Observable 发射的项目数量一样多的项目。

另请参阅

特定语言信息

待定

RxGroovy 将此操作符实现为 zip 的几个变体,以及 zipWith,它是该操作符的实例函数版本。

zip

zip 的最后一个参数是一个函数,它接受每个被压缩的 Observables 的一个项目,并发射一个要由从 zip 返回的 Observable 发射的项目。您可以将要压缩在一起的 Observables 提供给 zip,无论是作为两个到九个单独的参数,还是作为单个参数:一个 Observables 的 Iterable 或一个发射 Observables 的 Observable(如上图所示)。

示例代码

odds  = Observable.from([1, 3, 5, 7, 9]);
evens = Observable.from([2, 4, 6]);

Observable.zip(odds, evens, {o, e -> [o, e]}).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[1, 2]
[3, 4]
[5, 6]
Sequence complete

请注意,在此示例中,生成的 Observable 在发射三个项目后正常完成,这是两个组件 Observbles(evens,它发射三个项目)中较短的一个发射的项目数量。

zipWith

此操作符的 zipWith 实例版本总是接受两个参数。第一个参数可以是一个简单的 Observable,也可以是一个可迭代对象(如上图所示)。

zipzipWith 默认情况下不会在任何特定的 调度器 上运行。

RxJava 将此操作符实现为 zip 的几个变体,以及 zipWith,它是该操作符的实例函数版本。

zip

zip 的最后一个参数是一个函数,它接受每个被压缩的 Observables 的一个项目,并发射一个要由从 zip 返回的 Observable 发射的项目。您可以将要压缩在一起的 Observables 提供给 zip,无论是作为两个到九个单独的参数,还是作为单个参数:一个 Observables 的 Iterable 或一个发射 Observables 的 Observable(如上图所示)。

zipWith

此操作符的 zipWith 实例版本总是接受两个参数。第一个参数可以是一个简单的 Observable,也可以是一个可迭代对象(如上图所示)。

zipzipWith 默认情况下不会在任何特定的 调度器 上运行。

RxJS 将此操作符实现为 zipzipArray

zip

zip 接受可变数量的 Observables 或 Promises 作为参数,后面跟着一个函数,该函数接受从这些 Observables 发射的每个项目或从这些 Promises 解析的每个项目作为输入,并生成一个要由生成的 Observable 发射的单个项目。

示例代码

/* Using arguments */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    range,
    range.skip(1),
    range.skip(2),
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Next: 1:2:3
Next: 2:3:4
Completed
/* Using promises and Observables */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    RSVP.Promise.resolve(0),
    RSVP.Promise.resolve(1),
    Rx.Observable.return(2)
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Completed
zipArray

zipArray 接受可变数量的 Observables 作为参数,并返回一个发射数组的 Observable,每个数组都包含来自每个源 Observable 的第 nth 个项目。

示例代码

var range = Rx.Observable.range(0, 5);

var source = Rx.Observable.zipArray(
    range,
    range.skip(1), 
    range.skip(2)
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });
Next: [0,1,2]
Next: [1,2,3]
Next: [2,3,4]
Completed
forkJoin

RxJS 还实现了一个类似的操作符 forkJoin。此操作符有两个变体。第一个将每个源 Observables 发射的最后一个元素收集到一个数组中,并将此数组作为其自己的唯一发射的项目发射。您可以将 Observables 列表作为单独的参数或作为 Observables 数组传递给 forkJoin

var source = Rx.Observable.forkJoin(
    Rx.Observable.return(42),
    Rx.Observable.range(0, 10),
    Rx.Observable.fromArray([1,2,3]),
    RSVP.Promise.resolve(56)
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: [42, 9, 3, 56]
Completed
forkJoin

forkJoin 的第二个变体存在于原型函数中,您可以将其调用到一个源 Observable 的实例上,将另一个源 Observable 作为参数传递给它。作为第二个参数,您将一个函数传递给它,该函数将两个源 Observables 发射的最终项目组合成要由生成的 Observable 发射的唯一项目。

var source1 = Rx.Observable.return(42);
var source2 = Rx.Observable.range(0, 3);

var source = source1.forkJoin(source2, function (s1, s2) {
    return s1 + s2;
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 44
Completed

forkJoin 存在于以下发行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js(需要 rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.js

待定

RxPHP 将此操作符实现为 zip

使用选择器函数将指定的 Observable 序列合并为一个 Observable 序列,只要所有 Observable 序列在相应的索引处都产生了一个元素。如果省略结果选择器函数,则将生成包含相应索引处 Observable 序列元素的列表。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip.php

//Without a result selector
$range = \Rx\Observable::fromArray(range(0, 4));

$source = $range
    ->zip([
        $range->skip(1),
        $range->skip(2)
    ]);

$observer = $createStdoutObserver();

$subscription = $source
    ->subscribe(new CallbackObserver(
        function ($array) use ($observer) {
            $observer->onNext(json_encode($array));
        },
        [$observer, 'onError'],
        [$observer, 'onCompleted']
    ));

   
Next value: [0,1,2]
Next value: [1,2,3]
Next value: [2,3,4]
Complete!
    
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip-result-selector.php

//With a result selector
$range = \Rx\Observable::fromArray(range(0, 4));

$source = $range
    ->zip([
        $range->skip(1),
        $range->skip(2)
    ], function ($s1, $s2, $s3) {
        return $s1 . ':' . $s2 . ':' . $s3;
    });

$observer = $createStdoutObserver();

$subscription = $source->subscribe($createStdoutObserver());

   
Next value: 0:1:2
Next value: 1:2:3
Next value: 2:3:4
Complete!
    

RxPHP 还拥有一个 forkJoin 操作符。

并行运行所有 Observable 序列,并收集它们的最后一个元素。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/forkJoin/forkJoin.php

use Rx\Observable;

$obs1 = Observable::range(1, 4);
$obs2 = Observable::range(3, 5);
$obs3 = Observable::fromArray(['a', 'b', 'c']);

$observable = Observable::forkJoin([$obs1, $obs2, $obs3], function($v1, $v2, $v3) {
    return $v1 . $v2 . $v3;
});
$observable->subscribe($stdoutObserver);
   
Next value: 47c
Complete!
    

待定

待定