一个 可连接的 Observable 类似于一个普通的 Observable,除了它不会在被订阅时开始发射项目,而是在 Connect 操作符应用于它时才会开始。这样,你可以选择Observable 开始发射项目的时间。
如果你在将 Observable 转换为可连接的 Observable 之前将 Replay 操作符应用于它,那么结果的可连接的 Observable 将始终向任何未来的观察者发射相同的完整序列,即使这些观察者在可连接的 Observable 开始向其他订阅的观察者发射项目之后订阅。
在 RxGroovy 中,replay 操作符的一种变体返回一个可连接的 Observable。你必须 Publish 这个可连接的 Observable,观察者才能订阅它,然后 Connect 到它以观察它的发射。
这种 replay 操作符的变体允许你设置一个最大缓冲区大小来限制 replay 将缓冲并重播到后续观察者的项目数量,或者建立一个移动时间窗口来定义发射的项目何时变得太旧而无法缓冲和重播。
replay()replay(Scheduler)replay(int)replay(int,Scheduler)replay(long,TimeUnit)replay(long,TimeUnit,Scheduler)replay(int,long,TimeUnit)replay(int,long,TimeUnit,Scheduler)
还有一种 replay 返回一个普通的 Observable。这些变体以一个转换函数作为参数;这个函数以源 Observable 发射的项目作为参数,并返回由结果 Observable 发射的项目。所以实际上,这个操作符不是重播源 Observable,而是重播由这个函数转换后的源 Observable。
这种 replay 操作符的变体允许你设置一个最大缓冲区大小来限制 replay 将缓冲并重播到后续观察者的项目数量,或者建立一个移动时间窗口来定义发射的项目何时变得太旧而无法缓冲和重播。
replay(Func1)replay(Func1,Scheduler)replay(Func1,int)replay(Func1,int,Scheduler)replay(Func1,long,TimeUnit)replay(Func1,long,TimeUnit,Scheduler)replay(Func1,int,long,TimeUnit)replay(Func1,int,long,TimeUnit,Scheduler)
在 RxJava 中,replay 操作符的一种变体返回一个可连接的 Observable。你必须 Publish 这个可连接的 Observable,观察者才能订阅它,然后 Connect 到它以观察它的发射。
这种 replay 操作符的变体允许你设置一个最大缓冲区大小来限制 replay 将缓冲并重播到后续观察者的项目数量,或者建立一个移动时间窗口来定义发射的项目何时变得太旧而无法缓冲和重播。
replay()replay(Scheduler)replay(int)replay(int,Scheduler)replay(long,TimeUnit)replay(long,TimeUnit,Scheduler)replay(int,long,TimeUnit)replay(int,long,TimeUnit,Scheduler)
还有一种 replay 返回一个普通的 Observable。这些变体以一个转换函数作为参数;这个函数以源 Observable 发射的项目作为参数,并返回由结果 Observable 发射的项目。所以实际上,这个操作符不是重播源 Observable,而是重播由这个函数转换后的源 Observable。
这种 replay 操作符的变体允许你设置一个最大缓冲区大小来限制 replay 将缓冲并重播到后续观察者的项目数量,或者建立一个移动时间窗口来定义发射的项目何时变得太旧而无法缓冲和重播。
replay(Func1)replay(Func1,Scheduler)replay(Func1,int)replay(Func1,int,Scheduler)replay(Func1,long,TimeUnit)replay(Func1,long,TimeUnit,Scheduler)replay(Func1,int,long,TimeUnit)replay(Func1,int,long,TimeUnit,Scheduler)
在 RxJs 中,replay 操作符接受四个可选参数并返回一个普通的 Observable
selectorbufferSizewindowschedulervar interval = Rx.Observable.interval(1000);
var source = interval
.take(2)
.do(function (x) {
console.log('Side effect');
});
var published = source
.replay(function (x) {
return x.take(2).repeat(2);
}, 3);
published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));
function createObserver(tag) {
return Rx.Observer.create(
function (x) { console.log('Next: ' + tag + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
}Side effect Next: SourceA0 Side effect Next: SourceB0 Side effect Next: SourceA1 Next: SourceA0 Next: SourceA1 Completed Side effect Next: SourceB1 Next: SourceB0 Next: SourceB1 Completed
还有一个 shareReplay 操作符,它跟踪观察者的数量,并在该数量降至零时断开与源 Observable 的连接。shareReplay 接受三个可选参数并返回一个普通的 Observable
bufferSizewindowschedulervar interval = Rx.Observable.interval(1000);
var source = interval
.take(4)
.doAction(function (x) {
console.log('Side effect');
});
var published = source
.shareReplay(3);
published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));
// Creating a third subscription after the previous two subscriptions have
// completed. Notice that no side effects result from this subscription,
// because the notifications are cached and replayed.
Rx.Observable
.return(true)
.delay(6000)
.flatMap(published)
.subscribe(createObserver('SourceC'));
function createObserver(tag) {
return Rx.Observer.create(
function (x) { console.log('Next: ' + tag + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
}Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Side effect Next: SourceA2 Next: SourceB2 Side effect Next: SourceA3 Next: SourceB3 Completed Completed Next: SourceC1 Next: SourceC2 Next: SourceC3 Completed
replay 和 shareReplay 位于以下发行版中
rx.all.jsrx.all.compat.jsrx.binding.js(需要 rx.js 或 rx.compat.js)rx.lite.jsrx.lite.compat.js待定
RxPHP 将此操作符实现为 replay。
返回一个可观察的序列,它是对可连接的可观察的序列(共享对基础序列的单一订阅,重播通知,受重播缓冲区最大时间长度的限制)调用选择器后的结果。此操作符是使用 ReplaySubject 的 Multicast 的特殊情况。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/replay/replay.php
$interval = \Rx\Observable::interval(1000);
$source = $interval
->take(2)
->doOnNext(function ($x) {
echo $x, ' something', PHP_EOL;
echo 'Side effect', PHP_EOL;
});
$published = $source
->replay(function (\Rx\Observable $x) {
return $x->take(2)->repeat(2);
}, 3);
$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));
0 something
Side effect
0 something
Side effect
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceA Complete!
SourceB Next value: 0
SourceB Complete!
1 something
Side effect
1 something
Side effect
RxPHP 还具有一个 shareReplay 操作符。
返回一个可观察的序列,共享对基础序列的单一订阅,重播通知,受重播缓冲区最大时间长度的限制。此操作符是 replay 的特殊情况,当观察者数量从零变为一时创建订阅,然后与所有后续观察者共享该订阅,直到观察者数量返回到零,此时订阅被释放。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareReplay.php
$interval = Rx\Observable::interval(1000);
$source = $interval
->take(4)
->doOnNext(function ($x) {
echo 'Side effect', PHP_EOL;
});
$published = $source
->shareReplay(3);
$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));
Rx\Observable
::of(true)
->concatMapTo(\Rx\Observable::timer(6000))
->flatMap(function () use ($published) {
return $published;
})
->subscribe($createStdoutObserver('SourceC '));
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
Side effect
SourceA Next value: 2
SourceB Next value: 2
Side effect
SourceA Next value: 3
SourceB Next value: 3
SourceA Complete!
SourceB Complete!
SourceC Next value: 1
SourceC Next value: 2
SourceC Next value: 3
SourceC Complete!
待定
待定