Buffer

定期将 Observable 发出的项目收集到捆绑包中,然后发出这些捆绑包,而不是一次发出一个项目。

Buffer

The Buffer 操作符将一个发出项目的 Observable 转换为一个发出这些项目的缓冲集合的 Observable。 在 Buffer 的各种特定于语言的实现中,有很多变体,它们在如何选择哪些项目进入哪些缓冲区方面有所不同。

请注意,如果源 Observable 发出 onError 通知,Buffer 会立即传递此通知,而不会先发出它正在组装的缓冲区,即使该缓冲区包含由源 Observable 发出但在发出错误通知之前发出的项目。

The Window 操作符类似于 Buffer,但将项目收集到单独的 Observables 中,而不是在重新发出它们之前收集到数据结构中。

另请参阅

特定于语言的信息

RxCpp 实现 Buffer 的两种变体

buffer(count)

buffer(count)

buffer(count)vector 的形式发出不重叠的缓冲区,每个缓冲区最多包含 count 个来自源 Observable 的项目(最后发出的 vector 可能少于 count 个项目)。

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) 从源 Observable 发出的第一个项目开始创建一个新的缓冲区,并在之后的每 skip 个项目后创建一个新的缓冲区,并用 count 个项目填充每个缓冲区:初始项目和 count-1 个后续项目。 它以 vector 的形式发出这些缓冲区。 具体来说,根据 countskip 的值,这些缓冲区可能会重叠(多个缓冲区可能包含相同的项目),或者它们可能存在间隙(其中源 Observable 发出的项目没有在任何缓冲区中表示)。

在 RxGroovy 中,Buffer 有几种变体

buffer(count)

buffer(count)

buffer(count)List 的形式发出不重叠的缓冲区,每个缓冲区最多包含 count 个来自源 Observable 的项目(最后发出的 List 可能少于 count 个项目)。

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) 从源 Observable 发出的第一个项目开始创建一个新的缓冲区,并在之后的每 skip 个项目后创建一个新的缓冲区,并用 count 个项目填充每个缓冲区:初始项目和 count-1 个后续项目。 它以 List 的形式发出这些缓冲区。 具体来说,根据 countskip 的值,这些缓冲区可能会重叠(多个缓冲区可能包含相同的项目),或者它们可能存在间隙(其中源 Observable 发出的项目没有在任何缓冲区中表示)。

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

当它订阅源 Observable 时,buffer(bufferClosingSelector) 开始将它的发射收集到 List 中,并且它还调用 bufferClosingSelector 来生成一个第二个 Observable。 当这个第二个 Observable 发出一个 TClosing 对象时,buffer 会发出当前的 List,并重复此过程:开始一个新的 List,并调用 bufferClosingSelector 来创建一个新的 Observable 来监控。 它会一直这样做,直到源 Observable 终止。

buffer(boundary[, initialCapacity])

buffer(boundary)

buffer(boundary) 监控一个 Observable,boundary。 每次该 Observable 发出一个项目时,它都会创建一个新的 List 来开始收集源 Observable 发出的项目,并发出之前的 List

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) 监控一个 Observable,bufferOpenings,它会发出 BufferOpening 对象。 每当它观察到这样的发出项目时,它都会创建一个新的 List 来开始收集源 Observable 发出的项目,并将 bufferOpenings Observable 传递给 closingSelector 函数。 该函数返回一个 Observable。 buffer 监控该 Observable,当它检测到来自它的发出项目时,它会关闭 List 并将其作为自己的发射发出。

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) 定期发出一个新的 List 项目,每 timespan 时间量发出一次,包含自上次捆绑发射以来或在第一个捆绑的情况下,自订阅源 Observable 以来源 Observable 发出的所有项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) 每当源 Observable 发出 count 个项目时,都会发出一个新的 List 项目,或者,如果自上次捆绑发射以来已经过去了 timespan,它会发出一个捆绑包,其中包含源 Observable 在该跨度内发出的项目数量,即使少于 count 个项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit)timeshift 时间段创建一个新的 List 项目,并用从该时间到自捆绑包创建以来过去了 timespan 时间为止源 Observable 发出的所有项目填充这个捆绑包,然后发出这个 List 作为它自己的发射。 如果 timespan 长于 timeshift,则发出的捆绑包将表示重叠的时间段,因此它们可能包含重复的项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

您可以使用 Buffer 操作符来实现背压(即,处理可能以其观察者无法消耗的速度发出项目的 Observable)。

Buffer as a backpressure strategy

Buffer 可以将许多项目的序列减少到更少的缓冲区项目序列,使其更易于管理。 例如,您可以定期以规律的时间间隔关闭并发出来自突发 Observable 的项目缓冲区。

示例代码

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Buffer as a backpressure strategy

或者,您可以花哨一些,在突发期间收集项目到缓冲区,并在每次突发结束时发出它们,方法是使用 Debounce 操作符向缓冲区操作符发出缓冲区关闭指示器。

示例代码

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

在 RxJava 中,Buffer 有几种变体

buffer(count)

buffer(count)

buffer(count)List 的形式发出不重叠的缓冲区,每个缓冲区最多包含 count 个来自源 Observable 的项目(最后发出的 List 可能少于 count 个项目)。

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) 从源 Observable 发出的第一个项目开始创建一个新的缓冲区,并在之后的每 skip 个项目后创建一个新的缓冲区,并用 count 个项目填充每个缓冲区:初始项目和 count-1 个后续项目。 它以 List 的形式发出这些缓冲区。 具体来说,根据 countskip 的值,这些缓冲区可能会重叠(多个缓冲区可能包含相同的项目),或者它们可能存在间隙(其中源 Observable 发出的项目没有在任何缓冲区中表示)。

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

当它订阅源 Observable 时,buffer(bufferClosingSelector) 开始将它的发射收集到 List 中,并且它还调用 bufferClosingSelector 来生成一个第二个 Observable。 当这个第二个 Observable 发出一个 TClosing 对象时,buffer 会发出当前的 List,并重复此过程:开始一个新的 List,并调用 bufferClosingSelector 来创建一个新的 Observable 来监控。 它会一直这样做,直到源 Observable 终止。

buffer(boundary)

buffer(boundary)

buffer(boundary) 监控一个 Observable,boundary。 每次该 Observable 发出一个项目时,它都会创建一个新的 List 来开始收集源 Observable 发出的项目,并发出之前的 List

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) 监控一个 Observable,bufferOpenings,它会发出 BufferOpening 对象。 每当它观察到这样的发出项目时,它都会创建一个新的 List 来开始收集源 Observable 发出的项目,并将 bufferOpenings Observable 传递给 closingSelector 函数。 该函数返回一个 Observable。 buffer 监控该 Observable,当它检测到来自它的发出项目时,它会关闭 List 并将其作为自己的发射发出。

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) 定期发出一个新的 List 项目,每 timespan 时间量发出一次,包含自上次捆绑发射以来或在第一个捆绑的情况下,自订阅源 Observable 以来源 Observable 发出的所有项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) 每当源 Observable 发出 count 个项目时,都会发出一个新的 List 项目,或者,如果自上次捆绑发射以来已经过去了 timespan,它会发出一个捆绑包,其中包含源 Observable 在该跨度内发出的项目数量,即使少于 count 个项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit)timeshift 时间段创建一个新的 List 项目,并用从该时间到自捆绑包创建以来过去了 timespan 时间为止源 Observable 发出的所有项目填充这个捆绑包,然后发出这个 List 作为它自己的发射。 如果 timespan 长于 timeshift,则发出的捆绑包将表示重叠的时间段,因此它们可能包含重复的项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

您可以使用 Buffer 操作符来实现背压(即,处理可能以其观察者无法消耗的速度发出项目的 Observable)。

Buffer as a backpressure strategy

Buffer 可以将许多项目的序列减少到更少的缓冲区项目序列,使其更易于管理。 例如,您可以定期以规律的时间间隔关闭并发出来自突发 Observable 的项目缓冲区。

示例代码

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Buffer as a backpressure strategy

或者,您可以花哨一些,在突发期间收集项目到缓冲区,并在每次突发结束时发出它们,方法是使用 Debounce 操作符向缓冲区操作符发出缓冲区关闭指示器。

示例代码

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

另请参阅

RxJS 有四个 Buffer 操作符 — bufferbufferWithCountbufferWithTimebufferWithTimeOrCount — 每个操作符都有变体,这些变体在控制哪些源 Observable 项目作为哪些缓冲区的一部分发出方面有不同的方式。

buffer(bufferBoundaries)

buffer(bufferBoundaries)

buffer(bufferBoundaries) 监控一个 Observable,bufferBoundaries。 每次该 Observable 发出一个项目时,它都会创建一个新的集合来开始收集源 Observable 发出的项目,并发出之前的集合。

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

当它订阅源 Observable 时,buffer(bufferClosingSelector) 开始将它的发射收集到一个集合中,并且它还调用 bufferClosingSelector 来生成一个第二个 Observable。 当这个第二个 Observable 发出一个项目时,buffer 会发出当前的集合,并重复此过程:开始一个新的集合,并调用 bufferClosingSelector 来创建一个新的 Observable 来监控。 它会一直这样做,直到源 Observable 终止。

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) 监控一个 Observable,bufferOpenings,它会发出 BufferOpening 对象。 每当它观察到这样的发出项目时,它都会创建一个新的集合来开始收集源 Observable 发出的项目,并将 bufferOpenings Observable 传递给 bufferClosingSelector 函数。 该函数返回一个 Observable。 buffer 监控该 Observable,当它检测到来自它的发出项目时,它会发出当前的集合并开始一个新的集合。

buffer 在以下每个发行版中都可以找到

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js

buffer 需要以下发行版之一

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

bufferWithCount(count)

bufferWithCount(count)

bufferWithCount(count) 发出不重叠的缓冲区,每个缓冲区最多包含 count 个来自源 Observable 的项目(最后发出的缓冲区可能少于 count 个项目)。

bufferWithCount(count, skip)

bufferWithCount(count,skip)

bufferWithCount(count, skip) 从源 Observable 发出的第一个项目开始创建一个新缓冲区,并在接下来的每 skip 个项目之后创建一个新缓冲区,并将每个缓冲区填充 count 个项目:初始项目和 count-1 个后续项目,在每个缓冲区完成后发出它们。根据 countskip 的值,这些缓冲区可能重叠(多个缓冲区可能包含同一个项目),也可能存在间隙(源 Observable 发出的项目在任何缓冲区中都没有表示)。

bufferWithCount 存在于以下每个发行版中:

  • rx.js
  • rx.compat.js
  • rx.all.js
  • rx.all.compat.js
  • rx.lite.extras.js

bufferWithTime(timeSpan)

bufferWithTime(timeSpan)

bufferWithTime(timeSpan) 定期发出一个新的项目集合,每 timeSpan 毫秒发出一次,包含自上次捆绑发出以来或在第一次捆绑的情况下,自订阅源 Observable 以来源 Observable 发出的所有项目。此运算符变体还有一个版本,它接受一个 Scheduler 作为参数,并使用它来控制时间跨度;默认情况下,此变体使用 timeout 调度程序。

bufferWithTime(timeSpan, timeShift)

bufferWithTime(timeSpan,timeShift)

bufferWithTime(timeSpan, timeShift)timeShift 毫秒创建一个新的项目集合,并用自该时间起源 Observable 发出的所有项目填充该捆绑,直到自集合创建以来已经过去了 timeSpan 毫秒,然后将其作为自己的发出发出。如果 timeSpan 长于 timeShift,则发出的捆绑将代表重叠的时间段,因此它们可能包含重复的项目。此运算符变体还有一个版本,它接受一个 Scheduler 作为参数,并使用它来控制时间跨度;默认情况下,此变体使用 timeout 调度程序。

bufferWithTimeOrCount(timeSpan, count)

bufferWithTimeOrCount(timeSpan,count)

bufferWithTimeOrCount(timeSpan, count) 每当源 Observable 发出 count 个项目时,就会发出一个新的项目集合,或者,如果自上次集合发出以来已经过去了 timeSpan 毫秒,它将发出源 Observable 在该时间跨度内发出的任何项目的集合,即使少于 count 个。此运算符变体还有一个版本,它接受一个 Scheduler 作为参数,并使用它来控制时间跨度;默认情况下,此变体使用 timeout 调度程序。

bufferWithTimebufferWithTimeOrCount 存在于以下每个发行版中:

  • rx.all.js
  • rx.all.compat.js
  • rx.time.js

bufferWithTimebufferWithTimeOrCount 需要以下发行版之一:

  • rx.time.js 需要 rx.jsrx.compat.js
  • 否则:rx.lite.jsrx.lite.compat.js

在 RxKotlin 中,Buffer 有几个变体

buffer(count)

buffer(count)

buffer(count)List 的形式发出不重叠的缓冲区,每个缓冲区最多包含 count 个来自源 Observable 的项目(最后发出的 List 可能少于 count 个项目)。

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) 从源 Observable 发出的第一个项目开始创建一个新的缓冲区,并在之后的每 skip 个项目后创建一个新的缓冲区,并用 count 个项目填充每个缓冲区:初始项目和 count-1 个后续项目。 它以 List 的形式发出这些缓冲区。 具体来说,根据 countskip 的值,这些缓冲区可能会重叠(多个缓冲区可能包含相同的项目),或者它们可能存在间隙(其中源 Observable 发出的项目没有在任何缓冲区中表示)。

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

当它订阅源 Observable 时,buffer(bufferClosingSelector) 开始将它的发射收集到 List 中,并且它还调用 bufferClosingSelector 来生成一个第二个 Observable。 当这个第二个 Observable 发出一个 TClosing 对象时,buffer 会发出当前的 List,并重复此过程:开始一个新的 List,并调用 bufferClosingSelector 来创建一个新的 Observable 来监控。 它会一直这样做,直到源 Observable 终止。

buffer(boundary)

buffer(boundary)

buffer(boundary) 监控一个 Observable,boundary。 每次该 Observable 发出一个项目时,它都会创建一个新的 List 来开始收集源 Observable 发出的项目,并发出之前的 List

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) 监控一个 Observable,bufferOpenings,它会发出 BufferOpening 对象。 每当它观察到这样的发出项目时,它都会创建一个新的 List 来开始收集源 Observable 发出的项目,并将 bufferOpenings Observable 传递给 closingSelector 函数。 该函数返回一个 Observable。 buffer 监控该 Observable,当它检测到来自它的发出项目时,它会关闭 List 并将其作为自己的发射发出。

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) 定期发出一个新的 List 项目,每 timespan 时间量发出一次,包含自上次捆绑发射以来或在第一个捆绑的情况下,自订阅源 Observable 以来源 Observable 发出的所有项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) 每当源 Observable 发出 count 个项目时,都会发出一个新的 List 项目,或者,如果自上次捆绑发射以来已经过去了 timespan,它会发出一个捆绑包,其中包含源 Observable 在该跨度内发出的项目数量,即使少于 count 个项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit)timeshift 时间段创建一个新的 List 项目,并用从该时间到自捆绑包创建以来过去了 timespan 时间为止源 Observable 发出的所有项目填充这个捆绑包,然后发出这个 List 作为它自己的发射。 如果 timespan 长于 timeshift,则发出的捆绑包将表示重叠的时间段,因此它们可能包含重复的项目。 此操作符变体还有一个版本,它将 Scheduler 作为参数并使用它来控制时间跨度;默认情况下,此变体使用 computation Scheduler。

在 Rx.NET 中,Buffer 有几个变体。对于每种变体,您可以将源 Observable 作为第一个参数传入,也可以将其作为源 Observable 的实例方法调用(在这种情况下,您可以省略该参数)

Buffer(count)

Buffer(count)

Buffer(count)IList 的形式发出非重叠的缓冲区,每个缓冲区最多包含源 Observable 的 count 个项目(最终发出的 IList 可能少于 count 个项目)。

Buffer(count, skip)

Buffer(count,skip)

Buffer(count, skip) 从源 Observable 发出的第一个项目开始创建一个新缓冲区,并在接下来的每 skip 个项目之后创建一个新缓冲区,并将每个缓冲区填充 count 个项目:初始项目和 count-1 个后续项目。它将这些缓冲区作为 IList 发出。根据 countskip 的值,这些缓冲区可能重叠(多个缓冲区可能包含同一个项目),也可能存在间隙(源 Observable 发出的项目在任何缓冲区中都没有表示)。

Buffer(bufferClosingSelector)

Buffer(bufferClosingSelector)

当它订阅源 Observable 时,Buffer(bufferClosingSelector) 开始将它的发出收集到一个 IList 中,并且它还调用 bufferClosingSelector 来生成一个第二个 Observable。当这个第二个 Observable 发出一个 TBufferClosing 对象时,Buffer 会发出当前的 IList,并重复此过程:开始一个新的 IList,并调用 bufferClosingSelector 来创建一个新的 Observable 来监控。它将一直这样做,直到源 Observable 终止。

Buffer(bufferOpenings,bufferClosingSelector)

Buffer(bufferOpenings,bufferClosingSelector)

Buffer(bufferOpenings, bufferClosingSelector) 监控一个 Observable,BufferOpenings,它发出 TBufferOpening 对象。每当它观察到这样一个发出的项目时,它就会创建一个新的 IList 来开始收集源 Observable 发出的项目,并将 TBufferOpening 对象传递给 bufferClosingSelector 函数。该函数返回一个 Observable。Buffer 监控该 Observable,并在检测到它发出的项目时,关闭 IList 并将其作为自己的发出发出。

Buffer(timeSpan)

Buffer(timeSpan)

Buffer(timeSpan) 定期发出一个新的 IList 项目,每 timeSpan 时间发出一次,包含自上次捆绑发出以来或在第一次列表的情况下,自订阅源 Observable 以来源 Observable 发出的所有项目。此运算符变体还有一个版本,它接受一个 IScheduler 作为参数,并使用它来控制时间跨度。

Buffer(timeSpan, count)

Buffer(timeSpan,count)

Buffer(timeSpan, count) 每当源 Observable 发出 count 个项目时,就会发出一个新的 IList 项目,或者,如果自上次列表发出以来已经过去了 timeSpan,它将发出源 Observable 在该时间跨度内发出的任何项目的列表,即使少于 count 个。此运算符变体还有一个版本,它接受一个 IScheduler 作为参数,并使用它来控制时间跨度。

Buffer(timeSpan, timeShift)

Buffer(timeSpan,timeShift)

Buffer(timeSpan, timeShift)timeShift 时间段创建一个新的 IList 项目,并用自该时间起源 Observable 发出的所有项目填充该列表,直到自列表创建以来已经过去了 timeSpan 时间,然后将其作为自己的发出发出。如果 timeSpan 长于 timeShift,则发出的列表将代表重叠的时间段,因此它们可能包含重复的项目。此运算符变体还有一个版本,它接受一个 IScheduler 作为参数,并使用它来控制时间跨度。

RxPHP 将此运算符实现为 bufferWithCount

将可观察序列的每个元素投影到零个或多个缓冲区,这些缓冲区是根据元素计数信息生成的。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCount.php

$source = Rx\Observable::range(1, 6)
    ->bufferWithCount(2)
    ->subscribe($stdoutObserver);

   
Next value: [1,2]
Next value: [3,4]
Next value: [5,6]
Complete!
    
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCountAndSkip.php

$source = Rx\Observable::range(1, 6)
    ->bufferWithCount(2, 1)
    ->subscribe($stdoutObserver);

   
Next value: [1,2]
Next value: [2,3]
Next value: [3,4]
Next value: [4,5]
Next value: [5,6]
Next value: [6]
Complete!
    

RxPY 有几个 Buffer 变体:bufferbuffer_with_countbuffer_with_timebuffer_with_time_or_count。对于这些变体中的每一个,都有可选参数来改变运算符的行为。与往常一样,在 RxPY 中,当运算符可能接受多个可选参数时,请确保在调用运算符时在参数列表中命名参数,以避免歧义。

buffer(buffer_openings)

buffer(buffer_openings)

buffer(buffer_openings=boundaryObservable) 监控一个 Observable,buffer_openings。每当该 Observable 发出一个项目时,它就会创建一个新的数组来开始收集源 Observable 发出的项目,并发出上一个数组。

buffer(closing_selector)

buffer(closing_selector)

buffer(closing_selector=closingSelector) 开始收集源 Observable 在订阅时发出的项目,并且还调用 closing_selector 函数来生成第二个 Observable。它监控这个新的 Observable,并且,当它完成或发出一个项目时,它发出当前的数组,开始一个新的数组来收集源 Observable 发出的项目,并再次调用 closing_selector 来生成一个新的 Observable 来监控,以便确定何时发出新数组。它重复此过程,直到源 Observable 终止,然后它发出最终的数组。

buffer(closing_selector,buffer_closing_selector)

buffer(closing_selector=openingSelector, buffer_closing_selector=closingSelector) 通过调用 closing_selector 来获取一个 Observable 开始。它监控这个 Observable,并且,每当它发出一个项目时,buffer 就会创建一个新的数组,开始将源 Observable 随后发出的项目收集到这个数组中,并调用 buffer_closing_selector 来获取一个新的 Observable 来控制该数组的关闭。当这个新的 Observable 发出一个项目或终止时,buffer 会关闭并发出 Observable 控制的数组。

buffer_with_count(count)

buffer_with_count(count)

buffer_with_count(count) 以数组的形式发出非重叠的缓冲区,每个缓冲区最多包含源 Observable 的 count 个项目(最终发出的数组可能少于 count 个项目)。

buffer_with_count(count, skip)

buffer_with_count(count,skip)

buffer_with_count(count, skip=skip) 从源 Observable 发出的第一个项目开始创建一个新缓冲区,并在接下来的每 skip 个项目之后创建一个新缓冲区,并将每个缓冲区填充 count 个项目:初始项目和 count-1 个后续项目。它将这些缓冲区作为数组发出。根据 countskip 的值,这些缓冲区可能重叠(多个缓冲区可能包含同一个项目),也可能存在间隙(源 Observable 发出的项目在任何缓冲区中都没有表示)。

buffer_with_time(timespan)

buffer_with_time(timespan)

buffer_with_time(timespan) 定期发出一个新的项目数组,每 timespan 毫秒发出一次,包含自上次捆绑发出以来或在第一次捆绑的情况下,自订阅源 Observable 以来源 Observable 发出的所有项目。此运算符变体还有一个版本,它接受一个 scheduler 参数并使用它来控制时间跨度;默认情况下,此变体使用 timeout 调度程序。

buffer_with_time(timespan, timeshift)

buffer_with_time(timespan,timeshift)

buffer(timespan, timeshift=timeshift)timeshift 毫秒创建一个新的项目数组,并用自该时间起源 Observable 发出的所有项目填充该数组,直到自数组创建以来已经过去了 timespan 毫秒,然后将其作为自己的发出发出。如果 timespan 长于 timeshift,则发出的数组将代表重叠的时间段,因此它们可能包含重复的项目。此运算符变体还有一个版本,它接受一个 scheduler 参数并使用它来控制时间跨度;默认情况下,此变体使用 timeout 调度程序。

buffer_with_time_or_count(timespan, count)

buffer_with_time_or_count(timespan,count)

buffer_with_time_or_count(timespan, count) 每当源 Observable 发出 count 个项目时,就会发出一个新的项目数组,或者,如果自上次捆绑发出以来已经过去了 timespan 毫秒,它将发出源 Observable 在该时间跨度内发出的任何项目的数组,即使少于 count 个。此运算符变体还有一个版本,它接受一个 scheduler 参数并使用它来控制时间跨度;默认情况下,此变体使用 timeout 调度程序。

Rx.rb 有三个 Buffer 运算符变体

buffer_with_count(count)

buffer_with_count(count)

buffer_with_count(count) 以数组的形式发出非重叠的缓冲区,每个缓冲区最多包含源 Observable 的 count 个项目(最终发出的数组可能少于 count 个项目)。

buffer_with_count(count,skip)

buffer_with_count(count,skip)

buffer_with_count(count, skip=skip) 从源 Observable 发出的第一个项目开始创建一个新缓冲区,并在接下来的每 skip 个项目之后创建一个新缓冲区,并将每个缓冲区填充 count 个项目:初始项目和 count-1 个后续项目。它将这些缓冲区作为数组发出。根据 countskip 的值,这些缓冲区可能重叠(多个缓冲区可能包含同一个项目),也可能存在间隙(源 Observable 发出的项目在任何缓冲区中都没有表示)。

buffer_with_time(timespan)

buffer_with_time(timespan)

buffer_with_time(timespan) 定期发出一个新的项目数组,每 timespan 毫秒发出一次,包含自上次捆绑发出以来或在第一次捆绑的情况下,自订阅源 Observable 以来源 Observable 发出的所有项目。

RxScala 有两种 Buffer 变体——slidingBuffertumblingBuffer——每种变体都有不同的变体,它们使用不同的方式来组装它们发出的缓冲区

slidingBuffer(count, skip)

slidingBuffer(count,skip)

slidingBuffer(count, skip) 从源 Observable 发出的第一个项目开始创建一个新缓冲区,并在接下来的每 skip 个项目之后创建一个新缓冲区,并将每个缓冲区填充 count 个项目:初始项目和 count-1 个后续项目。它将这些缓冲区作为 Seqs 发出。根据 countskip 的值,这些缓冲区可能重叠(多个缓冲区可能包含同一个项目),也可能存在间隙(源 Observable 发出的项目在任何缓冲区中都没有表示)。

slidingBuffer(timespan, timeshift)

slidingBuffer(timespan,timeshift)

slidingBuffer(timespan, timeshift) 每隔 timeshift(一个 Duration)创建一个新的 Seq,并将该缓冲区填充为从该时间开始到 timespan(也是一个 Duration)结束之前,源 Observable 发出的所有项目,然后将该 Seq 作为它自己的发射。如果 timespan 长于 timeshift,则发射的数组将表示重叠的时间段,因此它们可能包含重复的项目。此操作符变体的另一个版本接受一个 Scheduler 作为参数,并使用它来控制时间跨度。

slidingBuffer(openings, closings)

slidingBuffer(openings,closings)

slidingBuffer(openings,closings) 监视 openings Observable,并在它发射一个 Opening 项目时,slidingBuffer 创建一个新的 Seq,开始将源 Observable 随后发射的项目收集到该缓冲区中,并调用 closings 获取一个新的 Observable 来控制该缓冲区的关闭。当这个新的 Observable 发射一个项目或终止时,slidingBuffer 将关闭并发射 Seq,该 Observable 控制该 Seq

tumblingBuffer(count)

tumblingBuffer(count)

tumblingBuffer(count) 发射非重叠缓冲区,以 Seq 的形式,每个缓冲区最多包含来自源 Observable 的 count 个项目(最后一个发射的缓冲区可能少于 count 个项目)。

tumblingBuffer(boundary)

tumblingBuffer(boundary)

tumblingBuffer(boundary) 监视一个 Observable,boundary。每次该 Observable 发射一个项目时,它都会创建一个新的 Seq 来开始收集源 Observable 发射的项目,并发射之前的 Seq。此操作符变体具有一个可选的第二个参数 initialCapacity,您可以使用它来指示这些缓冲区的预期大小,从而使内存分配更有效。

tumblingBuffer(timespan)

tumblingBuffer(timespan)

tumblingBuffer(timespan) 定期发射一个新的 Seq,每隔 timespan(一个 Duration),包含自上一个捆绑发射以来或在第一个捆绑的情况下,自订阅源 Observable 以来源 Observable 发射的所有项目。此操作符变体具有一个可选的第二个参数 scheduler,您可以使用它来设置您想要控制时间跨度计算的 Scheduler

tumblingBuffer(timespan, count)

tumblingBuffer(timespan,count)

tumblingBuffer(timespan, count) 每隔 count 个项目发射一个新的 Seq,或者,如果 timespan(一个 Duration)自上一个捆绑发射以来已过,它将发射一个 Seq,该 Seq 包含在该时间段内源 Observable 发射的项目数量,即使少于 count 个。此操作符变体具有一个可选的第三个参数 scheduler,您可以使用它来设置您想要控制时间跨度计算的 Scheduler