RxGroovy 实现 groupBy
操作符。它返回的 Observable 发出特定 Observable 子类的项目,即 GroupedObservable
。实现 GroupedObservable
接口的对象有一个额外的 getkey
方法,你可以通过它检索用于将项目指定给特定 GroupedObservable
的键。
以下示例代码使用 groupBy
将数字列表转换为两个列表,按数字是否为偶数进行分组
示例代码
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };
numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete
groupBy
的另一个版本允许你传入一个变换函数,该函数在最终 GroupedObservable
发出元素之前更改这些元素。
请注意,当 groupBy
将源 Observable 拆分成一个发出 GroupedObservable
的 Observable 时,这些 GroupedObservable
中的每一个都会开始缓冲它在订阅时将发出的项目。因此,如果你忽略了任何 GroupedObservable
(你既没有订阅它,也没有对其应用订阅它的操作符),这个缓冲区将可能造成内存泄漏。因此,与其忽略你没有兴趣观察的 GroupedObservable
,不如对其应用 take(0)
这样的操作符,作为向它发出信号以丢弃其缓冲区的方式。
如果你退订了某个 GroupedObservable
,或者如果你应用于 GroupedObservable
的 take
之类的操作符从它退订,则该 GroupedObservable
将被终止。如果源 Observable 后来发出一个键与以这种方式被终止的 GroupedObservable
匹配的项目,则 groupBy
将创建并发出一个新的 GroupedObservable
来匹配该键。换句话说,从 GroupedObservable
退订不会导致 groupBy
吞掉其组中的项目。例如,请参见以下代码
示例代码
Observable.range(1,5)
.groupBy({ 0 })
.flatMap({ this.take(1) })
.subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
在上面的代码中,源 Observable 发出序列 { 1 2 3 4 5 }
。当它发出此序列中的第一个项目时,groupBy
操作符创建并发出一个键为 0
的 GroupedObservable
。flatMap
操作符对该 GroupedObservable
应用 take(1)
操作符,这使它得到了它发出的项目(1
)以及从 GroupedObservable
退订,该 GroupedObservable
被终止。当源 Observable 发出其序列中的第二个项目时,groupBy
操作符创建并发出一个第二个键为 0
的 GroupedObservable
来替换被终止的那个。flatMap
再次对这个新的 GroupedObservable
应用 take(1)
来检索要发出的新项目(2
)以及从 GroupedObservable
退订并终止它,并且此过程对源序列中的剩余项目重复进行。
groupBy
默认情况下不会在任何特定的 Scheduler 上运行。