GroupBy

将一个 Observable 拆分成一组 Observable,每个 Observable 都从原始 Observable 中发出不同的项目子集。

GroupBy

GroupBy 操作符将一个发出项目的 Observable 拆分成一个发出 Observable 的 Observable,其中每个 Observable 都发出原始源 Observable 中的某些项目子集。哪些项目最终落在哪个 Observable 上通常由一个鉴别函数决定,该函数评估每个项目并为其分配一个键。所有具有相同键的项目都由同一个 Observable 发出。

另请参阅

特定语言信息

groupBy

RxGroovy 实现 groupBy 操作符。它返回的 Observable 发出特定 Observable 子类的项目,即 GroupedObservable。实现 GroupedObservable 接口的对象有一个额外的 getkey 方法,你可以通过它检索用于将项目指定给特定 GroupedObservable 的键。

以下示例代码使用 groupBy 将数字列表转换为两个列表,按数字是否为偶数进行分组

示例代码

def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };

numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete

groupBy 的另一个版本允许你传入一个变换函数,该函数在最终 GroupedObservable 发出元素之前更改这些元素。

请注意,当 groupBy 将源 Observable 拆分成一个发出 GroupedObservable 的 Observable 时,这些 GroupedObservable 中的每一个都会开始缓冲它在订阅时将发出的项目。因此,如果你忽略了任何 GroupedObservable(你既没有订阅它,也没有对其应用订阅它的操作符),这个缓冲区将可能造成内存泄漏。因此,与其忽略你没有兴趣观察的 GroupedObservable,不如对其应用 take(0) 这样的操作符,作为向它发出信号以丢弃其缓冲区的方式。

如果你退订了某个 GroupedObservable,或者如果你应用于 GroupedObservabletake 之类的操作符从它退订,则该 GroupedObservable 将被终止。如果源 Observable 后来发出一个键与以这种方式被终止的 GroupedObservable 匹配的项目,则 groupBy 将创建并发出一个GroupedObservable 来匹配该键。换句话说,从 GroupedObservable 退订不会导致 groupBy 吞掉其组中的项目。例如,请参见以下代码

示例代码

Observable.range(1,5)
          .groupBy({ 0 })
          .flatMap({ this.take(1) })
          .subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
2
3
4
5

在上面的代码中,源 Observable 发出序列 { 1 2 3 4 5 }。当它发出此序列中的第一个项目时,groupBy 操作符创建并发出一个键为 0GroupedObservableflatMap 操作符对该 GroupedObservable 应用 take(1) 操作符,这使它得到了它发出的项目(1)以及从 GroupedObservable 退订,该 GroupedObservable 被终止。当源 Observable 发出其序列中的第二个项目时,groupBy 操作符创建并发出一个第二个键为 0GroupedObservable 来替换被终止的那个。flatMap 再次对这个新的 GroupedObservable 应用 take(1) 来检索要发出的新项目(2)以及从 GroupedObservable 退订并终止它,并且此过程对源序列中的剩余项目重复进行。

groupBy 默认情况下不会在任何特定的 Scheduler 上运行。

groupBy

RxJava 实现 groupBy 操作符。它返回的 Observable 发出特定 Observable 子类的项目,即 GroupedObservable。实现 GroupedObservable 接口的对象有一个额外的 getkey 方法,你可以通过它检索用于将项目指定给特定 GroupedObservable 的键。

groupBy 的另一个版本允许你传入一个变换函数,该函数在最终 GroupedObservable 发出元素之前更改这些元素。

请注意,当 groupBy 将源 Observable 拆分成一个发出 GroupedObservable 的 Observable 时,这些 GroupedObservable 中的每一个都会开始缓冲它在订阅时将发出的项目。因此,如果你忽略了任何 GroupedObservable(你既没有订阅它,也没有对其应用订阅它的操作符),这个缓冲区将可能造成内存泄漏。因此,与其忽略你没有兴趣观察的 GroupedObservable,不如对其应用 take(0) 这样的操作符,作为向它发出信号以丢弃其缓冲区的方式。

如果你退订了某个 GroupedObservable,则该 GroupedObservable 将被终止。如果源 Observable 后来发出一个键与以这种方式被终止的 GroupedObservable 匹配的项目,则 groupBy 将创建并发出一个新的 GroupedObservable 来匹配该键。

groupBy 默认情况下不会在任何特定的 Scheduler 上运行。

groupBy

RxJS 实现 groupBy。它接受一个到三个参数

  1. (必需) 一个函数,它接受源 Observable 中的项目并返回其键
  2. 一个函数,它接受源 Observable 中的项目并返回一个项目,该项目将由生成的 Observable 中的一个发出
  3. 一个函数,用于比较两个键以确定它们是否相同(也就是说,是否具有两个键的项目应在同一个 Observable 上发出)

示例代码

var codes = [
    { keyCode: 38}, // up
    { keyCode: 38}, // up
    { keyCode: 40}, // down
    { keyCode: 40}, // down
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 66}, // b
    { keyCode: 65}  // a
];

var source = Rx.Observable.fromArray(codes)
    .groupBy(
        function (x) { return x.keyCode; },
        function (x) { return x.keyCode; });

var subscription = source.subscribe(
    function (obs) {
        // Print the count
        obs.count().subscribe(function (x) {
            console.log('Count: ' + x);
        });
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Count: 2
Count: 2
Count: 2
Count: 2
Count: 1
Count: 1
Completed

groupBy 在以下每个发行版中都有

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js
groupByUntil

RxJS 还实现 groupByUntil。它监控一个额外的 Observable,并且每当该 Observable 发出一个项目时,它就会关闭它打开的任何键控 Observable(如果源 Observable 发出与键匹配的额外项目,它将打开新的 Observable)。groupByUntil 接受两个到四个参数

  1. (必需) 一个函数,它接受源 Observable 中的项目并返回其键
  2. 一个函数,它接受源 Observable 中的项目并返回一个项目,该项目将由生成的 Observable 中的一个发出
  3. (必需) 一个函数,它返回一个 Observable,该 Observable 的发出触发任何打开的 Observable 的终止
  4. 一个函数,用于比较两个键以确定它们是否相同(也就是说,是否具有两个键的项目应在同一个 Observable 上发出)

示例代码

var codes = [
    { keyCode: 38}, // up
    { keyCode: 38}, // up
    { keyCode: 40}, // down
    { keyCode: 40}, // down
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 66}, // b
    { keyCode: 65}  // a
];

var source = Rx.Observable
    .for(codes, function (x) { return Rx.Observable.return(x).delay(1000); })
    .groupByUntil(
        function (x) { return x.keyCode; },
        function (x) { return x.keyCode; },
        function (x) { return Rx.Observable.timer(2000); });

var subscription = source.subscribe(
    function (obs) {
        // Print the count
        obs.count().subscribe(function (x) { console.log('Count: ' + x); });
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Count: 2
Count: 2
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Completed

groupByUntil 在以下每个发行版中都有

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js

RxPHP 实现此操作符为 groupBy

根据指定的键选择器函数和比较器对可观察序列的元素进行分组,并使用指定的函数选择生成的元素。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php

$observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]);
$observable
    ->groupBy(
        function ($elem) {
            if ($elem === 42) {
                return 0;
            }

            return 1;
        },
        null,
        function ($key) {
            return $key;
        }
    )
    ->subscribe(function ($groupedObserver) use ($createStdoutObserver) {
        $groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": "));
    });

   
1: Next value: 21
0: Next value: 42
1: Next value: 21
0: Next value: 42
1: Next value: 21
0: Next value: 42
1: Complete!
0: Complete!
    

RxPHP 还有一个操作符 groupByUntil

根据指定的键选择器函数和比较器对可观察序列的元素进行分组,并使用指定的函数选择生成的元素。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php

$codes = [
    ['id' => 38],
    ['id' => 38],
    ['id' => 40],
    ['id' => 40],
    ['id' => 37],
    ['id' => 39],
    ['id' => 37],
    ['id' => 39],
    ['id' => 66],
    ['id' => 65]
];

$source = Rx\Observable
    ::fromArray($codes)
    ->concatMap(function ($x) {
        return \Rx\Observable::timer(100)->mapTo($x);
    })
    ->groupByUntil(
        function ($x) {
            return $x['id'];
        },
        function ($x) {
            return $x['id'];
        },
        function ($x) {
            return Rx\Observable::timer(200);
        });

$subscription = $source->subscribe(new CallbackObserver(
    function (\Rx\Observable $obs) {
        // Print the count
        $obs->count()->subscribe(new CallbackObserver(
            function ($x) {
                echo 'Count: ', $x, PHP_EOL;
            }));
    },
    function (Throwable $err) {
        echo 'Error', $err->getMessage(), PHP_EOL;
    },
    function () {
        echo 'Completed', PHP_EOL;
    }));


   
Count: 2
Count: 2
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Completed
    

RxPHP 还有一个操作符 partition

返回两个可观察对象,它们根据给定的函数对源的可观察对象的观测结果进行分区。第一个将触发对谓词返回真值的那些值的观测。第二个将触发对谓词返回假的那些值的观测。谓词对每个已订阅的观察者执行一次。两者还传播源产生的所有错误观测,并且每个在源完成时完成。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.php

list($evens, $odds) = \Rx\Observable::range(0, 10, \Rx\Scheduler::getImmediate())
    ->partition(function ($x) {
        return $x % 2 === 0;
    });

//Because we used the immediate scheduler with range, the subscriptions are not asynchronous.
$evens->subscribe($createStdoutObserver('Evens '));
$odds->subscribe($createStdoutObserver('Odds '));

   
Evens Next value: 0
Evens Next value: 2
Evens Next value: 4
Evens Next value: 6
Evens Next value: 8
Evens Complete!
Odds Next value: 1
Odds Next value: 3
Odds Next value: 5
Odds Next value: 7
Odds Next value: 9
Odds Complete!
    

待定