您可以通过使用 TakeLast 运算符修改 Observable,仅发出 Observable 发出的最后 n 个项目,并忽略它们之前的那些项目。
takeLast
next
待定
takeLast takeLastBuffer
您可以通过使用 takeLast(n) 运算符修改 Observable,仅发出 Observable 发出的最后 n 个项目,并忽略它们之前的那些项目。请注意,这将延迟从源 Observable 发出任何项目,直到源 Observable 完成。
takeLast(n)
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]); numbers.takeLast(2).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
8 9 Sequence complete
此版本的 takeLast 默认情况下不会对任何特定的 Scheduler 进行操作。
takeLast(int)
还有一种 takeLast 变体,它采用时间持续时间而不是项目数量。它仅发出在源 Observable 生命周期的最后持续时间内发出的那些项目。您可以通过将时间长度和此长度表示的时间单位作为参数传递给 takeLast 来设置此持续时间。
请注意,这将延迟从源 Observable 发出任何项目,直到源 Observable 完成。
此版本的 takeLast 默认情况下在 computation Scheduler 上运行,但您也可以选择将您选择的 Scheduler 作为可选的第三个参数传递。
computation
takeLast(long,TimeUnit)
takeLast(long,TimeUnit,Scheduler)
还有一种变体结合了这两种方法。它发出在指定时间窗口内发出的项目数量 或 特定项目数量的最小值。
此版本的 takeLast 默认情况下在 computation Scheduler 上运行,但您也可以选择将您选择的 Scheduler 作为可选的第四个参数传递。
takeLast(int,long,TimeUnit)
takeLast(int,long,TimeUnit,Scheduler)
还有一个名为 takeLastBuffer 的运算符。它与上面描述的 takeLast 的变体集相同,并且仅在行为上不同,它不会单独发出其项目,而是将它们收集到一个单独的 List 中,该 List 作为单个项目发出。
takeLastBuffer
List
takeLastBuffer(int)
takeLastBuffer(long,TimeUnit)
takeLastBuffer(long,TimeUnit,Scheduler)
takeLastBuffer(int,long,TimeUnit)
takeLastBuffer(int,long,TimeUnit,Scheduler)
takeLast takeLastBuffer takeLastBufferWithTime takeLastWithTime
您可以通过使用 takeLast(n) 运算符修改 Observable,仅发出 Observable 发出的最后 n 个项目,并忽略它们之前的那些项目。请注意,这将延迟从源 Observable 发出任何项目,直到该 Observable 完成。
var source = Rx.Observable.range(0, 5) .takeLast(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 3 Next: 4 Completed
takeLast 存在于以下每个发行版中
rx.js
rx.alljs
rx.all.compatjs
rx.compat.js
rx.lite.js
rx.lite.compat.js
takeLastWithTime 运算符采用时间持续时间而不是项目数量。它仅发出在源 Observable 生命周期的最后持续时间内发出的那些项目。您可以通过将毫秒数作为参数传递给 takeLastWithTime 来设置此持续时间。
takeLastWithTime
请注意,实现此机制的方式将延迟从源 Observable 发出任何项目,直到该 Observable 完成。
takeLastWithTime 默认情况下在 timeout Scheduler 上运行计时器,并在 currentThread Scheduler 上发出项目,但您也可以选择将您选择的 Scheduler 作为可选的第二和第三个参数传递,分别覆盖它们。
timeout
currentThread
var source = Rx.Observable.timer(0, 1000) .take(10) .takeLastWithTime(5000); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 5 Next: 6 Next: 7 Next: 8 Next: 9 Completed
takeLastWithTime 存在于以下每个发行版中
rx.all.js
rx.all.compat.js
rx.time.js
还有一个名为 takeLastBuffer 的运算符。它与 takeLast 的行为不同,因为它不会单独发出其项目,而是将它们收集到一个单独的项目数组中,该数组作为单个项目发出。
var source = Rx.Observable.range(0, 5) .takeLastBuffer(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2,3,4 Completed
takeLastBuffer 存在于以下每个发行版中
takeLastBuffer 也有其基于持续时间的变体 takeLastBufferWithTime,它类似于 takeLastWithTime,除了它不会单独发出其项目,而是将它们收集到一个单独的项目数组中,该数组作为单个项目发出。
takeLastBufferWithTime
var source = Rx.Observable .timer(0, 1000) .take(10) .takeLastBufferWithTime(5000); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 5,6,7,8,9 Completed
takeLastBufferWithTime 存在于以下每个发行版中
TakeLast
RxPHP 将此运算符实现为 takeLast。
从 Observable 序列的末尾返回指定数量的连续元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/takeLast.php $source = \Rx\Observable::range(0, 5) ->takeLast(3); $source->subscribe($stdoutObserver);
Next value: 2 Next value: 3 Next value: 4 Complete!
take_last take_last_buffer take_last_with_time
take_last take_last_buffer
tail takeRight