GroupBy 操作符将一个发出项目的 Observable 拆分成一个发出 Observable 的 Observable,其中每个 Observable 都发出原始源 Observable 中的某些项目子集。哪些项目最终落在哪个 Observable 上通常由一个鉴别函数决定,该函数评估每个项目并为其分配一个键。所有具有相同键的项目都由同一个 Observable 发出。
RxGroovy 实现 groupBy
操作符。它返回的 Observable 发出特定 Observable 子类的项目,即 GroupedObservable
。实现 GroupedObservable
接口的对象有一个额外的 getkey
方法,你可以通过它检索用于将项目指定给特定 GroupedObservable
的键。
以下示例代码使用 groupBy
将数字列表转换为两个列表,按数字是否为偶数进行分组
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]); def groupFunc = { return(0 == (it % 2)); }; numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[false, 1, 3, 5, 7, 9] [true, 2, 4, 6, 8] Sequence complete
groupBy
的另一个版本允许你传入一个变换函数,该函数在最终 GroupedObservable
发出元素之前更改这些元素。
请注意,当 groupBy
将源 Observable 拆分成一个发出 GroupedObservable
的 Observable 时,这些 GroupedObservable
中的每一个都会开始缓冲它在订阅时将发出的项目。因此,如果你忽略了任何 GroupedObservable
(你既没有订阅它,也没有对其应用订阅它的操作符),这个缓冲区将可能造成内存泄漏。因此,与其忽略你没有兴趣观察的 GroupedObservable
,不如对其应用
这样的操作符,作为向它发出信号以丢弃其缓冲区的方式。take(0)
如果你退订了某个 GroupedObservable
,或者如果你应用于 GroupedObservable
的 take
之类的操作符从它退订,则该 GroupedObservable
将被终止。如果源 Observable 后来发出一个键与以这种方式被终止的 GroupedObservable
匹配的项目,则 groupBy
将创建并发出一个新的 GroupedObservable
来匹配该键。换句话说,从 GroupedObservable
退订不会导致 groupBy
吞掉其组中的项目。例如,请参见以下代码
Observable.range(1,5) .groupBy({ 0 }) .flatMap({ this.take(1) }) .subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 4 5
在上面的代码中,源 Observable 发出序列 { 1 2 3 4 5 }
。当它发出此序列中的第一个项目时,groupBy
操作符创建并发出一个键为 0
的 GroupedObservable
。flatMap
操作符对该 GroupedObservable
应用 take(1)
操作符,这使它得到了它发出的项目(1
)以及从 GroupedObservable
退订,该 GroupedObservable
被终止。当源 Observable 发出其序列中的第二个项目时,groupBy
操作符创建并发出一个第二个键为 0
的 GroupedObservable
来替换被终止的那个。flatMap
再次对这个新的 GroupedObservable
应用 take(1)
来检索要发出的新项目(2
)以及从 GroupedObservable
退订并终止它,并且此过程对源序列中的剩余项目重复进行。
groupBy
默认情况下不会在任何特定的 Scheduler 上运行。
groupBy(Func1)
groupBy(Func1,Func1)
RxJava 实现 groupBy
操作符。它返回的 Observable 发出特定 Observable 子类的项目,即 GroupedObservable
。实现 GroupedObservable
接口的对象有一个额外的 getkey
方法,你可以通过它检索用于将项目指定给特定 GroupedObservable
的键。
groupBy
的另一个版本允许你传入一个变换函数,该函数在最终 GroupedObservable
发出元素之前更改这些元素。
请注意,当 groupBy
将源 Observable 拆分成一个发出 GroupedObservable
的 Observable 时,这些 GroupedObservable
中的每一个都会开始缓冲它在订阅时将发出的项目。因此,如果你忽略了任何 GroupedObservable
(你既没有订阅它,也没有对其应用订阅它的操作符),这个缓冲区将可能造成内存泄漏。因此,与其忽略你没有兴趣观察的 GroupedObservable
,不如对其应用
这样的操作符,作为向它发出信号以丢弃其缓冲区的方式。take(0)
如果你退订了某个 GroupedObservable
,则该 GroupedObservable
将被终止。如果源 Observable 后来发出一个键与以这种方式被终止的 GroupedObservable
匹配的项目,则 groupBy
将创建并发出一个新的 GroupedObservable
来匹配该键。
groupBy
默认情况下不会在任何特定的 Scheduler 上运行。
groupBy(Func1)
groupBy(Func1,Func1)
RxJS 实现 groupBy
。它接受一个到三个参数
var codes = [ { keyCode: 38}, // up { keyCode: 38}, // up { keyCode: 40}, // down { keyCode: 40}, // down { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 66}, // b { keyCode: 65} // a ]; var source = Rx.Observable.fromArray(codes) .groupBy( function (x) { return x.keyCode; }, function (x) { return x.keyCode; }); var subscription = source.subscribe( function (obs) { // Print the count obs.count().subscribe(function (x) { console.log('Count: ' + x); }); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Count: 2 Count: 2 Count: 2 Count: 2 Count: 1 Count: 1 Completed
groupBy
在以下每个发行版中都有
rx.all.js
rx.all.compat.js
rx.coincidence.js
RxJS 还实现 groupByUntil
。它监控一个额外的 Observable,并且每当该 Observable 发出一个项目时,它就会关闭它打开的任何键控 Observable(如果源 Observable 发出与键匹配的额外项目,它将打开新的 Observable)。groupByUntil
接受两个到四个参数
var codes = [ { keyCode: 38}, // up { keyCode: 38}, // up { keyCode: 40}, // down { keyCode: 40}, // down { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 66}, // b { keyCode: 65} // a ]; var source = Rx.Observable .for(codes, function (x) { return Rx.Observable.return(x).delay(1000); }) .groupByUntil( function (x) { return x.keyCode; }, function (x) { return x.keyCode; }, function (x) { return Rx.Observable.timer(2000); }); var subscription = source.subscribe( function (obs) { // Print the count obs.count().subscribe(function (x) { console.log('Count: ' + x); }); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Count: 2 Count: 2 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Completed
groupByUntil
在以下每个发行版中都有
rx.all.js
rx.all.compat.js
rx.coincidence.js
RxPHP 实现此操作符为 groupBy
。
根据指定的键选择器函数和比较器对可观察序列的元素进行分组,并使用指定的函数选择生成的元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php $observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]); $observable ->groupBy( function ($elem) { if ($elem === 42) { return 0; } return 1; }, null, function ($key) { return $key; } ) ->subscribe(function ($groupedObserver) use ($createStdoutObserver) { $groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": ")); });
1: Next value: 21 0: Next value: 42 1: Next value: 21 0: Next value: 42 1: Next value: 21 0: Next value: 42 1: Complete! 0: Complete!
RxPHP 还有一个操作符 groupByUntil
。
根据指定的键选择器函数和比较器对可观察序列的元素进行分组,并使用指定的函数选择生成的元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php $codes = [ ['id' => 38], ['id' => 38], ['id' => 40], ['id' => 40], ['id' => 37], ['id' => 39], ['id' => 37], ['id' => 39], ['id' => 66], ['id' => 65] ]; $source = Rx\Observable ::fromArray($codes) ->concatMap(function ($x) { return \Rx\Observable::timer(100)->mapTo($x); }) ->groupByUntil( function ($x) { return $x['id']; }, function ($x) { return $x['id']; }, function ($x) { return Rx\Observable::timer(200); }); $subscription = $source->subscribe(new CallbackObserver( function (\Rx\Observable $obs) { // Print the count $obs->count()->subscribe(new CallbackObserver( function ($x) { echo 'Count: ', $x, PHP_EOL; })); }, function (Throwable $err) { echo 'Error', $err->getMessage(), PHP_EOL; }, function () { echo 'Completed', PHP_EOL; }));
Count: 2 Count: 2 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Completed
RxPHP 还有一个操作符 partition
。
返回两个可观察对象,它们根据给定的函数对源的可观察对象的观测结果进行分区。第一个将触发对谓词返回真值的那些值的观测。第二个将触发对谓词返回假的那些值的观测。谓词对每个已订阅的观察者执行一次。两者还传播源产生的所有错误观测,并且每个在源完成时完成。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.php list($evens, $odds) = \Rx\Observable::range(0, 10, \Rx\Scheduler::getImmediate()) ->partition(function ($x) { return $x % 2 === 0; }); //Because we used the immediate scheduler with range, the subscriptions are not asynchronous. $evens->subscribe($createStdoutObserver('Evens ')); $odds->subscribe($createStdoutObserver('Odds '));
Evens Next value: 0 Evens Next value: 2 Evens Next value: 4 Evens Next value: 6 Evens Next value: 8 Evens Complete! Odds Next value: 1 Odds Next value: 3 Odds Next value: 5 Odds Next value: 7 Odds Next value: 9 Odds Complete!
待定