FlatMap

将 Observable 发出的项目转换为 Observables,然后将这些 Observables 的发射结果扁平化为单个 Observable。

FlatMap

FlatMap 运算符通过对源 Observable 发出的每个项目应用您指定的函数来转换 Observable,其中该函数返回一个自身发出项目的 Observable。 FlatMap 然后合并这些生成的 Observables 的发射结果,并将这些合并的结果作为其自身的序列发出。

例如,当您有一个 Observable 发出自身具有 Observable 成员或以其他方式可转换为 Observables 的一系列项目时,此方法很有用,这样您就可以创建一个新的 Observable,它发出这些项目的子 Observables 发出的所有项目的完整集合。

请注意,FlatMap 合并 这些 Observables 的发射结果,因此它们可能会交错。

在几种特定于语言的实现中,还存在一个运算符交错来自已转换 Observables 的发射结果,而是严格按顺序发出这些发射结果,通常称为 ConcatMap 或类似名称。

另请参阅

特定于语言的信息

flatMap

RxGroovy 实现 flatMap 运算符。

示例代码

// this closure is an Observable that emits three numbers
numbers   = Observable.from([1, 2, 3]);
// this closure is an Observable that emits two numbers based on what number it is passed
multiples = { n -> Observable.from([ n*2, n*3 ]) };

numbers.flatMap(multiples).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
2
3
4
6
6
9
Sequence complete

请注意,如果由 flatMap 映射到源 Observable 项目的任何单个 Observable 通过调用 onError 终止,则由 flatMap 生成的 Observable 将立即自行终止并调用 onError

此运算符变体的版本接受一个额外的 int 参数。此参数设置 flatMap 将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。

flatMap

flatMap 的另一个版本为源 Observable 的每个项目和通知创建(并扁平化)一个新的 Observable。

此运算符变体的版本接受一个额外的 int 参数。此参数设置 flatMap 将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。

flatMap

另一个版本将源 Observable 中的项目与由这些源项目触发的 Observable 组合在一起,并发出这些组合。

此运算符变体的版本接受一个额外的 int 参数。此参数设置 flatMap 将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。

flatMapIterable

flatMapIterable 变体将源项目和生成的 Iterable 配对,而不是源项目和生成的 Observables,但在其他方面的工作方式基本相同。

concatMap

还存在一个 concatMap 运算符,它类似于 flatMap 运算符的简单版本,但它连接而不是合并生成的 Observables 以生成其自身的序列。

switchMap

RxGroovy 还实现了 switchMap 运算符。它的行为类似于 flatMap,除了每当源 Observable 发出新项目时,它都会取消订阅并停止镜像从先前发出的项目生成的 Observable,并且只开始镜像当前的项目。

flatMap

RxJava 实现 flatMap 运算符。

请注意,如果由 flatMap 映射到源 Observable 项目的任何单个 Observable 通过调用 onError 终止,则由 flatMap 生成的 Observable 将立即自行终止并调用 onError

此运算符变体的版本接受一个额外的 int 参数。此参数设置 flatMap 将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。

flatMap

flatMap 的另一个版本为源 Observable 的每个项目和通知创建(并扁平化)一个新的 Observable。

此运算符变体的版本接受一个额外的 int 参数。此参数设置 flatMap 将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。

flatMap

另一个版本将源 Observable 中的项目与由这些源项目触发的 Observable 组合在一起,并发出这些组合。

此运算符变体的版本接受一个额外的 int 参数。此参数设置 flatMap 将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。

flatMapIterable

flatMapIterable 变体将源项目和生成的 Iterable 配对,而不是源项目和生成的 Observables,但在其他方面的工作方式基本相同。

concatMap

还存在一个 concatMap 运算符,它类似于 flatMap 运算符的简单版本,但它连接而不是合并生成的 Observables 以生成其自身的序列。

switchMap

RxJava 还实现了 switchMap 运算符。它的行为类似于 flatMap,除了每当源 Observable 发出新项目时,它都会取消订阅并停止镜像从先前发出的项目生成的 Observable,并且只开始镜像当前的项目。

RxJS 拥有大量执行 FlatMap 类操作的运算符。在 RxJS 中,将源 Observable 发出的项目转换为 Observables 的函数通常将项目和项目在 Observable 序列中的索引作为参数。

flatMap

RxJS 实现基本 flatMap 运算符。它有一个变体,允许您对为源 Observable 中的每个项目生成的 Observables 发出的项目应用转换函数(flatMap 的可选第二个参数),然后合并并发出这些项目。

如果提供的函数将源 Observables 中的项目转换为 Observables、Promises 或数组,flatMap 也可以正常工作。

selectMany” 是 flatMap 的别名。

示例代码

var source = Rx.Observable
    .range(1, 2)
    .selectMany(function (x) {
        return Rx.Observable.range(x, 2);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 2
Next: 3
Completed
// Using a promise
var source = Rx.Observable.of(1,2,3,4)
    .selectMany(function (x, i) {
        return Promise.resolve(x + i);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 3
Next: 5
Next: 7
Completed
// Using an array
Rx.Observable.of(1,2,3)
  .flatMap(
    function (x, i) { return [x,i]; },
    function (x, y, ix, iy) { return x + y + ix + iy; }
  );

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 2
Next: 2
Next: 5
Next: 5
Next: 8
Next: 8
Completed

flatMap 存在于以下每个发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
flatMapLatest

flatMapLatest 运算符的行为类似于标准 FlatMap 运算符,除了每当源 Observable 发出新项目时,它都会取消订阅并停止镜像从先前发出的项目生成的 Observable,并且只开始镜像当前的项目。

selectSwitch” 是 flatMapLatest 的别名。

示例代码

var source = Rx.Observable
    .range(1, 2)
    .flatMapLatest(function (x) {
        return Rx.Observable.range(x, 2);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 3
Completed

flatMapLatest 存在于以下每个发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
flatMapObserver

flatMapObserver 为源 Observable 的每个项目和通知创建(并扁平化)一个新的 Observable。它接受不同的转换函数来响应 onNextonErroronCompleted 通知,并为每个通知返回一个 Observable。

selectManyObserver” 是 flatMapObserver 的别名。

示例代码

var source = Rx.Observable.range(1, 3)
    .flatMapObserver(
        function (x, i) {
            return Rx.Observable.repeat(x, i);
        },
        function (err) {
            return Rx.Observable.return(42);
        },
        function () {
            return Rx.Observable.empty();
        });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 2
Next: 3
Next: 3
Completed

flatMapObserver 存在于以下每个发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
concatMap

还存在一个 concatMap 运算符,它类似于 flatMap 运算符,但它连接而不是合并生成的 Observables 以生成其自身的序列。

flatMap 一样,如果提供的函数将源 Observables 中的项目转换为 Observables、Promises 或数组,concatMap 也可以正常工作。

selectConcat” 是 concatMap 的别名。

concatMap 存在于以下每个发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
for

for 运算符(及其别名 forIn)非常类似于 concatMap,尽管它具有相反的灵活性。虽然 concatMap 对 Observable 源进行操作,并且可以使用 Observable、Promise 或数组中间体来生成其输出序列;for 始终使用 Observable 作为其中间体,但可以对 Observable、Promise 或数组作为源进行操作。

concatMap 存在于以下每个发行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js(需要 rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.js 之一)
concatMapObserver

还存在一个 concatMapObserver 运算符,它类似于 flatMapObserver 运算符,因为它为源 Observable 的发射结果和终端通知都创建了要合并的 Observables,但它连接而不是合并这些生成的 Observables 以生成其自身的序列。

selectConcatObserver” 是 concatMapObserver 的别名。

concatMapObserver 存在于以下每个发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
manySelect

manySelect 运算符通常被称为“共态绑定”。如果您能从其中明白,您就不用谢了。否则,这里有一个解释

manySelect 在内部将源 Observable 发出的每个项目转换为一个 Observable,该 Observable 以相同的顺序发出该项目以及源 Observable 随后发出的所有项目。因此,例如,它在内部将发出数字 1、2、3 的 Observable 转换为三个 Observables:一个发出 1、2、3,一个发出 2、3,一个发出 3。

然后,manySelect 将这些 Observables 中的每一个传递给您提供的函数,并将这些函数调用的返回值作为 manySelect 返回的 Observable 的发射结果发出。

通过这种方式,结果 Observable 发出的每个项目都是源 Observable 中对应项目的函数,以及源 Observable 之后发出的所有项目的函数。

manySelect 存在于以下每个发行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js

manySelect 需要以下发行版之一

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

另请参阅

RxPHP 将此运算符实现为 flatMap

将 observable 序列中的每个元素投影到另一个 observable 序列,并将生成的 observable 序列合并到一个 observable 序列中。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/flatMap/flatMap.php

$observable = Rx\Observable::range(1, 2);

$selectManyObservable = $observable->flatMap(function ($value) {
    return Rx\Observable::range($value, 2);
});

$selectManyObservable->subscribe($stdoutObserver);
   
Next value: 1
Next value: 2
Next value: 2
Next value: 3
Complete!
    

RxPHP 还具有一个 flatMapTo 运算符。

将源 observable 序列中的每个元素投影到另一个 observable 序列,并将生成的 observable 序列合并到一个 observable 序列中。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMapTo.php

$obs = \Rx\Observable::interval(100)
    ->take(3)
    ->mapWithIndex(function ($i) {
        return $i;
    });

$source = Rx\Observable::range(0, 5)
    ->concatMapTo($obs);

$subscription = $source->subscribe($stdoutObserver);

   
Next value: 0
Next value: 1
Next value: 2
Next value: 3
Next value: 4
Next value: 5
Next value: 6
Next value: 7
Next value: 8
Next value: 9
Next value: 10
Next value: 11
Next value: 12
Next value: 13
Next value: 14
Complete!
    

RxPHP 还具有一个 selectMany 运算符。

flatMap 的别名

RxPHP 还具有一个 flatMapLatest 运算符。

跳过可观察序列中的指定数量的元素,然后返回剩余的元素。将可观察序列发出的项转换为可观察序列,并镜像最近转换的可观察序列发出的项。flatMapLatest 运算符类似于上面描述的 flatMap 和 concatMap 方法,但是,它不是发出所有由运算符生成的(通过转换源可观察序列的项)可观察序列发出的项,flatMapLatest 而是仅发出每个这样的转换后的可观察序列发出的项,直到发出下一个这样的可观察序列,然后它会忽略上一个并开始发出新的可观察序列发出的项。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/flatMap/flatMapLatest.php

$source = \Rx\Observable::range(1, 3)
    ->flatMapLatest(function ($x) {
        return \Rx\Observable::fromArray([$x . 'a', $x . 'b']);
    });

$source->subscribe($stdoutObserver);

   
Next value: 1a
Next value: 2a
Next value: 3a
Next value: 3b
Complete!
    

RxPHP 也包含一个 concatMap 运算符。

将可观察序列中的每个元素映射到一个可观察序列,并将生成的这些可观察序列串联成一个可观察序列。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMap.php

$source = Rx\Observable::range(0, 5)
    ->concatMap(function ($x, $i) {
        return \Rx\Observable::interval(100)
            ->take($x)
            ->map(function () use ($i) {
                return $i;
            });
    });

$subscription = $source->subscribe($stdoutObserver);

   
Next value: 1
Next value: 2
Next value: 2
Next value: 3
Next value: 3
Next value: 3
Next value: 4
Next value: 4
Next value: 4
Next value: 4
Complete!
    

RxPHP 也包含一个 concatMapTo 运算符。

将源 observable 序列中的每个元素投影到另一个 observable 序列,并将生成的 observable 序列合并到一个 observable 序列中。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMapTo.php

$obs = \Rx\Observable::interval(100)
    ->take(3)
    ->mapWithIndex(function ($i) {
        return $i;
    });

$source = Rx\Observable::range(0, 5)
    ->concatMapTo($obs);

$subscription = $source->subscribe($stdoutObserver);

   
Next value: 0
Next value: 1
Next value: 2
Next value: 3
Next value: 4
Next value: 5
Next value: 6
Next value: 7
Next value: 8
Next value: 9
Next value: 10
Next value: 11
Next value: 12
Next value: 13
Next value: 14
Complete!