重播

确保所有观察者看到相同的序列发射的项目,即使它们在 Observable 开始发射项目之后订阅。

Replay

一个 可连接的 Observable 类似于一个普通的 Observable,除了它不会在被订阅时开始发射项目,而是在 Connect 操作符应用于它时才会开始。这样,你可以选择Observable 开始发射项目的时间。

如果你在将 Observable 转换为可连接的 Observable 之前将 Replay 操作符应用于它,那么结果的可连接的 Observable 将始终向任何未来的观察者发射相同的完整序列,即使这些观察者在可连接的 Observable 开始向其他订阅的观察者发射项目之后订阅。

另见

特定语言信息

replay

在 RxGroovy 中,replay 操作符的一种变体返回一个可连接的 Observable。你必须 Publish 这个可连接的 Observable,观察者才能订阅它,然后 Connect 到它以观察它的发射。

这种 replay 操作符的变体允许你设置一个最大缓冲区大小来限制 replay 将缓冲并重播到后续观察者的项目数量,或者建立一个移动时间窗口来定义发射的项目何时变得太旧而无法缓冲和重播。

replay

还有一种 replay 返回一个普通的 Observable。这些变体以一个转换函数作为参数;这个函数以源 Observable 发射的项目作为参数,并返回由结果 Observable 发射的项目。所以实际上,这个操作符不是重播源 Observable,而是重播由这个函数转换后的源 Observable。

这种 replay 操作符的变体允许你设置一个最大缓冲区大小来限制 replay 将缓冲并重播到后续观察者的项目数量,或者建立一个移动时间窗口来定义发射的项目何时变得太旧而无法缓冲和重播。

replay

在 RxJava 中,replay 操作符的一种变体返回一个可连接的 Observable。你必须 Publish 这个可连接的 Observable,观察者才能订阅它,然后 Connect 到它以观察它的发射。

这种 replay 操作符的变体允许你设置一个最大缓冲区大小来限制 replay 将缓冲并重播到后续观察者的项目数量,或者建立一个移动时间窗口来定义发射的项目何时变得太旧而无法缓冲和重播。

replay

还有一种 replay 返回一个普通的 Observable。这些变体以一个转换函数作为参数;这个函数以源 Observable 发射的项目作为参数,并返回由结果 Observable 发射的项目。所以实际上,这个操作符不是重播源 Observable,而是重播由这个函数转换后的源 Observable。

这种 replay 操作符的变体允许你设置一个最大缓冲区大小来限制 replay 将缓冲并重播到后续观察者的项目数量,或者建立一个移动时间窗口来定义发射的项目何时变得太旧而无法缓冲和重播。

replay

在 RxJs 中,replay 操作符接受四个可选参数并返回一个普通的 Observable

selector
一个转换函数,以源 Observable 发射的项目作为参数,并返回由结果 Observable 发射的项目
bufferSize
要缓冲并重播到后续观察者的最大项目数量
window
缓冲区中项目可以被丢弃而不会发射到后续观察者的年龄(以毫秒为单位)
scheduler
这个操作符将在此 调度器 上运行

示例代码

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .do(function (x) {
        console.log('Side effect');
    });

var published = source
    .replay(function (x) {
        return x.take(2).repeat(2);
    }, 3);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Side effect
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceA0
Next: SourceA1
Completed
Side effect
Next: SourceB1
Next: SourceB0
Next: SourceB1
Completed

还有一个 shareReplay 操作符,它跟踪观察者的数量,并在该数量降至零时断开与源 Observable 的连接。shareReplay 接受三个可选参数并返回一个普通的 Observable

bufferSize
要缓冲并重播到后续观察者的最大项目数量
window
缓冲区中项目可以被丢弃而不会发射到后续观察者的年龄(以毫秒为单位)
scheduler
这个操作符将在此 调度器 上运行

示例代码

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(4)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source
    .shareReplay(3);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

// Creating a third subscription after the previous two subscriptions have
// completed. Notice that no side effects result from this subscription,
// because the notifications are cached and replayed.
Rx.Observable
    .return(true)
    .delay(6000)
    .flatMap(published)
    .subscribe(createObserver('SourceC'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Side effect
Next: SourceA2
Next: SourceB2
Side effect
Next: SourceA3
Next: SourceB3
Completed
Completed
Next: SourceC1
Next: SourceC2
Next: SourceC3
Completed

replayshareReplay 位于以下发行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js(需要 rx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

待定

RxPHP 将此操作符实现为 replay

返回一个可观察的序列,它是对可连接的可观察的序列(共享对基础序列的单一订阅,重播通知,受重播缓冲区最大时间长度的限制)调用选择器后的结果。此操作符是使用 ReplaySubject 的 Multicast 的特殊情况。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/replay/replay.php

$interval = \Rx\Observable::interval(1000);

$source = $interval
    ->take(2)
    ->doOnNext(function ($x) {
        echo $x, ' something', PHP_EOL;
        echo 'Side effect', PHP_EOL;
    });

$published = $source
    ->replay(function (\Rx\Observable $x) {
        return $x->take(2)->repeat(2);
    }, 3);

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

   
0 something
Side effect
0 something
Side effect
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceA Complete!
SourceB Next value: 0
SourceB Complete!
1 something
Side effect
1 something
Side effect
    

RxPHP 还具有一个 shareReplay 操作符。

返回一个可观察的序列,共享对基础序列的单一订阅,重播通知,受重播缓冲区最大时间长度的限制。此操作符是 replay 的特殊情况,当观察者数量从零变为一时创建订阅,然后与所有后续观察者共享该订阅,直到观察者数量返回到零,此时订阅被释放。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareReplay.php

$interval = Rx\Observable::interval(1000);

$source = $interval
    ->take(4)
    ->doOnNext(function ($x) {
        echo 'Side effect', PHP_EOL;
    });

$published = $source
    ->shareReplay(3);

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

Rx\Observable
    ::of(true)
    ->concatMapTo(\Rx\Observable::timer(6000))
    ->flatMap(function () use ($published) {
        return $published;
    })
    ->subscribe($createStdoutObserver('SourceC '));

   
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
Side effect
SourceA Next value: 2
SourceB Next value: 2
Side effect
SourceA Next value: 3
SourceB Next value: 3
SourceA Complete!
SourceB Complete!
SourceC Next value: 1
SourceC Next value: 2
SourceC Next value: 3
SourceC Complete!
    

待定

待定