一个 可连接的 Observable 类似于一个普通的 Observable,除了它不会在被订阅时开始发射项目,而是在 Connect 操作符被应用于它时才会发射项目。这样,你就可以在您选择的时间提示 Observable 开始发射项目。
The RefCount 操作符自动连接和断开连接到可连接的 Observable 的过程。它作用于可连接的 Observable 并返回一个普通的 Observable。当第一个观察者订阅此 Observable 时,RefCount 连接到底层可连接的 Observable。然后,RefCount 跟踪有多少其他观察者订阅了它,并且在最后一个观察者完成订阅之前不会断开与底层可连接的 Observable 的连接。
待定
RxGroovy 实现此操作符为 refCount
。
refCount()
还存在一个 share
操作符,它等同于将 publish
和 refCount
操作符按顺序应用于 Observable。
share()
RxJava 实现此操作符为 refCount
。
refCount()
还存在一个 share
操作符,它等同于将 publish
和 refCount
操作符按顺序应用于 Observable。
share()
RxJava 实现此操作符为 refCount
。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publish().refCount(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed Completed
refCount
存在于以下发行版中
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.js
、rx.compat.js
、rx.lite.js
或 rx.lite.compat.js
)rx.lite.js
rx.lite.compat.js
还存在一个 share
操作符,它等同于将 publish
和 refCount
操作符按顺序应用于 Observable。一个名为 shareValue
的变体将一个项目作为参数,它将在开始从源 Observable 发射项目之前将其发射给任何订阅者。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .do( function (x) { console.log('Side effect'); }); var published = source.share(); // When the number of observers subscribed to published observable goes from // 0 to 1, we connect to the underlying observable sequence. published.subscribe(createObserver('SourceA')); // When the second subscriber is added, no additional subscriptions are added to the // underlying observable sequence. As a result the operations that result in side // effects are not repeated per subscriber. published.subscribe(createObserver('SourceB')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed
share
和 shareValue
存在于以下发行版中
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxPHP 实现此操作符为 share
。
返回一个 Observable 序列,它共享对底层序列的单个订阅。此操作符是 publish 的特化,当观察者数量从零变为一时创建订阅,然后与所有后续观察者共享该订阅,直到观察者数量返回到零,此时订阅被处置。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/share.php //With Share $source = \Rx\Observable::interval(1000) ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->share(); $published->subscribe($createStdoutObserver('SourceA ')); $published->subscribe($createStdoutObserver('SourceB '));
Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 SourceA Complete! SourceB Complete!
RxPHP 还包含一个 singleInstance
操作符。
返回一个 Observable 序列,它共享对底层序列的单个订阅。即使所有先前的订阅都已结束,此 Observable 序列也可以重新订阅。此操作符的行为类似于 RxJS 5 中的 share()。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/singleInstance.php $interval = Rx\Observable::interval(1000); $source = $interval ->take(2) ->do(function () { echo 'Side effect', PHP_EOL; }); $single = $source->singleInstance(); // two simultaneous subscriptions, lasting 2 seconds $single->subscribe($createStdoutObserver('SourceA ')); $single->subscribe($createStdoutObserver('SourceB ')); \Rx\Observable::timer(5000)->subscribe(function () use ($single, &$createStdoutObserver) { // resubscribe two times again, more than 5 seconds later, // long after the original two subscriptions have ended $single->subscribe($createStdoutObserver('SourceC ')); $single->subscribe($createStdoutObserver('SourceD ')); });
Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 SourceA Complete! SourceB Complete! Side effect SourceC Next value: 0 SourceD Next value: 0 Side effect SourceC Next value: 1 SourceD Next value: 1 SourceC Complete! SourceD Complete!
RxPHP 还包含一个 shareValue
操作符。
返回一个 Observable 序列,它共享对底层序列的单个订阅,并以 initialValue 开头。此操作符是 publishValue 的特化,当观察者数量从零变为一时创建订阅,然后与所有后续观察者共享该订阅,直到观察者数量返回到零,此时订阅被处置。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareValue.php $source = \Rx\Observable::interval(1000) ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->shareValue(42); $published->subscribe($createStdoutObserver('SourceA ')); $published->subscribe($createStdoutObserver('SourceB '));
SourceA Next value: 42 SourceB Next value: 42 Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 SourceA Complete! SourceB Complete!
待定