FlatMap 运算符通过对源 Observable 发出的每个项目应用您指定的函数来转换 Observable,其中该函数返回一个自身发出项目的 Observable。 FlatMap 然后合并这些生成的 Observables 的发射结果,并将这些合并的结果作为其自身的序列发出。
例如,当您有一个 Observable 发出自身具有 Observable 成员或以其他方式可转换为 Observables 的一系列项目时,此方法很有用,这样您就可以创建一个新的 Observable,它发出这些项目的子 Observables 发出的所有项目的完整集合。
请注意,FlatMap 合并 这些 Observables 的发射结果,因此它们可能会交错。
在几种特定于语言的实现中,还存在一个运算符不交错来自已转换 Observables 的发射结果,而是严格按顺序发出这些发射结果,通常称为 ConcatMap 或类似名称。
RxGroovy 实现 flatMap
运算符。
// this closure is an Observable that emits three numbers numbers = Observable.from([1, 2, 3]); // this closure is an Observable that emits two numbers based on what number it is passed multiples = { n -> Observable.from([ n*2, n*3 ]) }; numbers.flatMap(multiples).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
2 3 4 6 6 9 Sequence complete
请注意,如果由 flatMap
映射到源 Observable 项目的任何单个 Observable 通过调用 onError
终止,则由 flatMap
生成的 Observable 将立即自行终止并调用 onError
。
此运算符变体的版本接受一个额外的 int
参数。此参数设置 flatMap
将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。
flatMap(Func1)
flatMap(Func1,int)
flatMap
的另一个版本为源 Observable 的每个项目和通知创建(并扁平化)一个新的 Observable。
此运算符变体的版本接受一个额外的 int
参数。此参数设置 flatMap
将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。
flatMap(Func1,Func1,Func0)
flatMap(Func1,Func1,Func0,int)
另一个版本将源 Observable 中的项目与由这些源项目触发的 Observable 组合在一起,并发出这些组合。
此运算符变体的版本接受一个额外的 int
参数。此参数设置 flatMap
将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。
flatMap(Func1,Func2)
flatMap(Func1,Func2,int)
flatMapIterable
变体将源项目和生成的 Iterable
配对,而不是源项目和生成的 Observables,但在其他方面的工作方式基本相同。
flatMapIterable(Func1)
flatMapIterable(Func1,Func2)
还存在一个 concatMap
运算符,它类似于 flatMap
运算符的简单版本,但它连接而不是合并生成的 Observables 以生成其自身的序列。
concatMap(Func1)
RxGroovy 还实现了 switchMap
运算符。它的行为类似于 flatMap
,除了每当源 Observable 发出新项目时,它都会取消订阅并停止镜像从先前发出的项目生成的 Observable,并且只开始镜像当前的项目。
switchMap(Func1)
RxJava 实现 flatMap
运算符。
请注意,如果由 flatMap
映射到源 Observable 项目的任何单个 Observable 通过调用 onError
终止,则由 flatMap
生成的 Observable 将立即自行终止并调用 onError
。
此运算符变体的版本接受一个额外的 int
参数。此参数设置 flatMap
将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。
flatMap(Func1)
flatMap(Func1,int)
flatMap
的另一个版本为源 Observable 的每个项目和通知创建(并扁平化)一个新的 Observable。
此运算符变体的版本接受一个额外的 int
参数。此参数设置 flatMap
将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。
flatMap(Func1,Func1,Func0)
flatMap(Func1,Func1,Func0,int)
另一个版本将源 Observable 中的项目与由这些源项目触发的 Observable 组合在一起,并发出这些组合。
此运算符变体的版本接受一个额外的 int
参数。此参数设置 flatMap
将尝试对源 Observable 发出的项目映射到的 Observables 进行的最大并发订阅数。当它达到此最大数时,它将等待其中一个 Observable 终止,然后再订阅另一个。
flatMap(Func1,Func2)
flatMap(Func1,Func2,int)
flatMapIterable
变体将源项目和生成的 Iterable
配对,而不是源项目和生成的 Observables,但在其他方面的工作方式基本相同。
flatMapIterable(Func1)
flatMapIterable(Func1,Func2)
还存在一个 concatMap
运算符,它类似于 flatMap
运算符的简单版本,但它连接而不是合并生成的 Observables 以生成其自身的序列。
concatMap(Func1)
RxJava 还实现了 switchMap
运算符。它的行为类似于 flatMap
,除了每当源 Observable 发出新项目时,它都会取消订阅并停止镜像从先前发出的项目生成的 Observable,并且只开始镜像当前的项目。
switchMap(Func1)
RxJS 拥有大量执行 FlatMap 类操作的运算符。在 RxJS 中,将源 Observable 发出的项目转换为 Observables 的函数通常将项目和项目在 Observable 序列中的索引作为参数。
RxJS 实现基本 flatMap
运算符。它有一个变体,允许您对为源 Observable 中的每个项目生成的 Observables 发出的项目应用转换函数(flatMap
的可选第二个参数),然后合并并发出这些项目。
如果提供的函数将源 Observables 中的项目转换为 Observables、Promises 或数组,flatMap
也可以正常工作。
“selectMany
” 是 flatMap
的别名。
var source = Rx.Observable .range(1, 2) .selectMany(function (x) { return Rx.Observable.range(x, 2); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 2 Next: 3 Completed
// Using a promise var source = Rx.Observable.of(1,2,3,4) .selectMany(function (x, i) { return Promise.resolve(x + i); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 3 Next: 5 Next: 7 Completed
// Using an array Rx.Observable.of(1,2,3) .flatMap( function (x, i) { return [x,i]; }, function (x, y, ix, iy) { return x + y + ix + iy; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 2 Next: 5 Next: 5 Next: 8 Next: 8 Completed
flatMap
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
flatMapLatest
运算符的行为类似于标准 FlatMap 运算符,除了每当源 Observable 发出新项目时,它都会取消订阅并停止镜像从先前发出的项目生成的 Observable,并且只开始镜像当前的项目。
“selectSwitch
” 是 flatMapLatest
的别名。
var source = Rx.Observable .range(1, 2) .flatMapLatest(function (x) { return Rx.Observable.range(x, 2); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Completed
flatMapLatest
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
flatMapObserver
为源 Observable 的每个项目和通知创建(并扁平化)一个新的 Observable。它接受不同的转换函数来响应 onNext
、onError
和 onCompleted
通知,并为每个通知返回一个 Observable。
“selectManyObserver
” 是 flatMapObserver
的别名。
var source = Rx.Observable.range(1, 3) .flatMapObserver( function (x, i) { return Rx.Observable.repeat(x, i); }, function (err) { return Rx.Observable.return(42); }, function () { return Rx.Observable.empty(); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 3 Next: 3 Completed
flatMapObserver
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
还存在一个 concatMap
运算符,它类似于 flatMap
运算符,但它连接而不是合并生成的 Observables 以生成其自身的序列。
与 flatMap
一样,如果提供的函数将源 Observables 中的项目转换为 Observables、Promises 或数组,concatMap
也可以正常工作。
“selectConcat
” 是 concatMap
的别名。
concatMap
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
for
运算符(及其别名 forIn
)非常类似于 concatMap
,尽管它具有相反的灵活性。虽然 concatMap
对 Observable 源进行操作,并且可以使用 Observable、Promise 或数组中间体来生成其输出序列;for
始终使用 Observable 作为其中间体,但可以对 Observable、Promise 或数组作为源进行操作。
concatMap
存在于以下每个发行版中
rx.all.js
rx.all.compat.js
rx.experimental.js
(需要 rx.js
、rx.compat.js
、rx.lite.js
或 rx.lite.compat.js
之一)还存在一个 concatMapObserver
运算符,它类似于 flatMapObserver
运算符,因为它为源 Observable 的发射结果和终端通知都创建了要合并的 Observables,但它连接而不是合并这些生成的 Observables 以生成其自身的序列。
“selectConcatObserver
” 是 concatMapObserver
的别名。
concatMapObserver
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
manySelect
运算符通常被称为“共态绑定”。如果您能从其中明白,您就不用谢了。否则,这里有一个解释
manySelect
在内部将源 Observable 发出的每个项目转换为一个 Observable,该 Observable 以相同的顺序发出该项目以及源 Observable 随后发出的所有项目。因此,例如,它在内部将发出数字 1、2、3 的 Observable 转换为三个 Observables:一个发出 1、2、3,一个发出 2、3,一个发出 3。
然后,manySelect
将这些 Observables 中的每一个传递给您提供的函数,并将这些函数调用的返回值作为 manySelect
返回的 Observable 的发射结果发出。
通过这种方式,结果 Observable 发出的每个项目都是源 Observable 中对应项目的函数,以及源 Observable 之后发出的所有项目的函数。
manySelect
存在于以下每个发行版中
rx.all.js
rx.all.compat.js
rx.experimental.js
manySelect
需要以下发行版之一
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxPHP 将此运算符实现为 flatMap
。
将 observable 序列中的每个元素投影到另一个 observable 序列,并将生成的 observable 序列合并到一个 observable 序列中。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/flatMap/flatMap.php $observable = Rx\Observable::range(1, 2); $selectManyObservable = $observable->flatMap(function ($value) { return Rx\Observable::range($value, 2); }); $selectManyObservable->subscribe($stdoutObserver);
Next value: 1 Next value: 2 Next value: 2 Next value: 3 Complete!
RxPHP 还具有一个 flatMapTo
运算符。
将源 observable 序列中的每个元素投影到另一个 observable 序列,并将生成的 observable 序列合并到一个 observable 序列中。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMapTo.php $obs = \Rx\Observable::interval(100) ->take(3) ->mapWithIndex(function ($i) { return $i; }); $source = Rx\Observable::range(0, 5) ->concatMapTo($obs); $subscription = $source->subscribe($stdoutObserver);
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Next value: 9 Next value: 10 Next value: 11 Next value: 12 Next value: 13 Next value: 14 Complete!
RxPHP 还具有一个 selectMany
运算符。
flatMap
的别名
RxPHP 还具有一个 flatMapLatest
运算符。
跳过可观察序列中的指定数量的元素,然后返回剩余的元素。将可观察序列发出的项转换为可观察序列,并镜像最近转换的可观察序列发出的项。flatMapLatest 运算符类似于上面描述的 flatMap 和 concatMap 方法,但是,它不是发出所有由运算符生成的(通过转换源可观察序列的项)可观察序列发出的项,flatMapLatest 而是仅发出每个这样的转换后的可观察序列发出的项,直到发出下一个这样的可观察序列,然后它会忽略上一个并开始发出新的可观察序列发出的项。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/flatMap/flatMapLatest.php $source = \Rx\Observable::range(1, 3) ->flatMapLatest(function ($x) { return \Rx\Observable::fromArray([$x . 'a', $x . 'b']); }); $source->subscribe($stdoutObserver);
Next value: 1a Next value: 2a Next value: 3a Next value: 3b Complete!
RxPHP 也包含一个 concatMap
运算符。
将可观察序列中的每个元素映射到一个可观察序列,并将生成的这些可观察序列串联成一个可观察序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMap.php $source = Rx\Observable::range(0, 5) ->concatMap(function ($x, $i) { return \Rx\Observable::interval(100) ->take($x) ->map(function () use ($i) { return $i; }); }); $subscription = $source->subscribe($stdoutObserver);
Next value: 1 Next value: 2 Next value: 2 Next value: 3 Next value: 3 Next value: 3 Next value: 4 Next value: 4 Next value: 4 Next value: 4 Complete!
RxPHP 也包含一个 concatMapTo
运算符。
将源 observable 序列中的每个元素投影到另一个 observable 序列,并将生成的 observable 序列合并到一个 observable 序列中。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMapTo.php $obs = \Rx\Observable::interval(100) ->take(3) ->mapWithIndex(function ($i) { return $i; }); $source = Rx\Observable::range(0, 5) ->concatMapTo($obs); $subscription = $source->subscribe($stdoutObserver);
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Next value: 9 Next value: 10 Next value: 11 Next value: 12 Next value: 13 Next value: 14 Complete!