您可以通过使用 TakeLast 运算符修改 Observable,仅发出 Observable 发出的最后 n 个项目,并忽略它们之前的那些项目。
takeLast
待定
您可以通过使用 takeLast(n)
运算符修改 Observable,仅发出 Observable 发出的最后 n 个项目,并忽略它们之前的那些项目。请注意,这将延迟从源 Observable 发出任何项目,直到源 Observable 完成。
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]); numbers.takeLast(2).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
8 9 Sequence complete
此版本的 takeLast
默认情况下不会对任何特定的 Scheduler 进行操作。
takeLast(int)
还有一种 takeLast
变体,它采用时间持续时间而不是项目数量。它仅发出在源 Observable 生命周期的最后持续时间内发出的那些项目。您可以通过将时间长度和此长度表示的时间单位作为参数传递给 takeLast
来设置此持续时间。
请注意,这将延迟从源 Observable 发出任何项目,直到源 Observable 完成。
此版本的 takeLast
默认情况下在 computation
Scheduler 上运行,但您也可以选择将您选择的 Scheduler 作为可选的第三个参数传递。
takeLast(long,TimeUnit)
takeLast(long,TimeUnit,Scheduler)
还有一种变体结合了这两种方法。它发出在指定时间窗口内发出的项目数量 或 特定项目数量的最小值。
此版本的 takeLast
默认情况下在 computation
Scheduler 上运行,但您也可以选择将您选择的 Scheduler 作为可选的第四个参数传递。
takeLast(int,long,TimeUnit)
takeLast(int,long,TimeUnit,Scheduler)
还有一个名为 takeLastBuffer
的运算符。它与上面描述的 takeLast
的变体集相同,并且仅在行为上不同,它不会单独发出其项目,而是将它们收集到一个单独的 List
中,该 List
作为单个项目发出。
takeLastBuffer(int)
takeLastBuffer(long,TimeUnit)
takeLastBuffer(long,TimeUnit,Scheduler)
takeLastBuffer(int,long,TimeUnit)
takeLastBuffer(int,long,TimeUnit,Scheduler)
您可以通过使用 takeLast(n)
运算符修改 Observable,仅发出 Observable 发出的最后 n 个项目,并忽略它们之前的那些项目。请注意,这将延迟从源 Observable 发出任何项目,直到源 Observable 完成。
此版本的 takeLast
默认情况下不会对任何特定的 Scheduler 进行操作。
takeLast(int)
还有一种 takeLast
变体,它采用时间持续时间而不是项目数量。它仅发出在源 Observable 生命周期的最后持续时间内发出的那些项目。您可以通过将时间长度和此长度表示的时间单位作为参数传递给 takeLast
来设置此持续时间。
请注意,这将延迟从源 Observable 发出任何项目,直到源 Observable 完成。
此版本的 takeLast
默认情况下在 computation
Scheduler 上运行,但您也可以选择将您选择的 Scheduler 作为可选的第三个参数传递。
takeLast(long,TimeUnit)
takeLast(long,TimeUnit,Scheduler)
还有一种变体结合了这两种方法。它发出在指定时间窗口内发出的项目数量 或 特定项目数量的最小值。
此版本的 takeLast
默认情况下在 computation
Scheduler 上运行,但您也可以选择将您选择的 Scheduler 作为可选的第四个参数传递。
takeLast(int,long,TimeUnit)
takeLast(int,long,TimeUnit,Scheduler)
还有一个名为 takeLastBuffer
的运算符。它与上面描述的 takeLast
的变体集相同,并且仅在行为上不同,它不会单独发出其项目,而是将它们收集到一个单独的 List
中,该 List
作为单个项目发出。
takeLastBuffer(int)
takeLastBuffer(long,TimeUnit)
takeLastBuffer(long,TimeUnit,Scheduler)
takeLastBuffer(int,long,TimeUnit)
takeLastBuffer(int,long,TimeUnit,Scheduler)
您可以通过使用 takeLast(n)
运算符修改 Observable,仅发出 Observable 发出的最后 n 个项目,并忽略它们之前的那些项目。请注意,这将延迟从源 Observable 发出任何项目,直到该 Observable 完成。
var source = Rx.Observable.range(0, 5) .takeLast(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 3 Next: 4 Completed
takeLast
存在于以下每个发行版中
rx.js
rx.alljs
rx.all.compatjs
rx.compat.js
rx.lite.js
rx.lite.compat.js
takeLastWithTime
运算符采用时间持续时间而不是项目数量。它仅发出在源 Observable 生命周期的最后持续时间内发出的那些项目。您可以通过将毫秒数作为参数传递给 takeLastWithTime
来设置此持续时间。
请注意,实现此机制的方式将延迟从源 Observable 发出任何项目,直到该 Observable 完成。
takeLastWithTime
默认情况下在 timeout
Scheduler 上运行计时器,并在 currentThread
Scheduler 上发出项目,但您也可以选择将您选择的 Scheduler 作为可选的第二和第三个参数传递,分别覆盖它们。
var source = Rx.Observable.timer(0, 1000) .take(10) .takeLastWithTime(5000); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 5 Next: 6 Next: 7 Next: 8 Next: 9 Completed
takeLastWithTime
存在于以下每个发行版中
rx.all.js
rx.all.compat.js
rx.time.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
还有一个名为 takeLastBuffer
的运算符。它与 takeLast
的行为不同,因为它不会单独发出其项目,而是将它们收集到一个单独的项目数组中,该数组作为单个项目发出。
var source = Rx.Observable.range(0, 5) .takeLastBuffer(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2,3,4 Completed
takeLastBuffer
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
takeLastBuffer
也有其基于持续时间的变体 takeLastBufferWithTime
,它类似于 takeLastWithTime
,除了它不会单独发出其项目,而是将它们收集到一个单独的项目数组中,该数组作为单个项目发出。
var source = Rx.Observable .timer(0, 1000) .take(10) .takeLastBufferWithTime(5000); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 5,6,7,8,9 Completed
takeLastBufferWithTime
存在于以下每个发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.time.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxPHP 将此运算符实现为 takeLast
。
从 Observable 序列的末尾返回指定数量的连续元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/takeLast.php $source = \Rx\Observable::range(0, 5) ->takeLast(3); $source->subscribe($stdoutObserver);
Next value: 2 Next value: 3 Next value: 4 Complete!