RefCount

使可连接的 Observable 行为类似于普通的 Observable

RefCount

一个 可连接的 Observable 类似于一个普通的 Observable,除了它不会在被订阅时开始发射项目,而是在 Connect 操作符被应用于它时才会发射项目。这样,你就可以在您选择的时间提示 Observable 开始发射项目。

The RefCount 操作符自动连接和断开连接到可连接的 Observable 的过程。它作用于可连接的 Observable 并返回一个普通的 Observable。当第一个观察者订阅此 Observable 时,RefCount 连接到底层可连接的 Observable。然后,RefCount 跟踪有多少其他观察者订阅了它,并且在最后一个观察者完成订阅之前不会断开与底层可连接的 Observable 的连接。

另见

特定语言的信息

待定

待定

refCount

RxGroovy 实现此操作符为 refCount

还存在一个 share 操作符,它等同于将 publishrefCount 操作符按顺序应用于 Observable。

refCount

RxJava 实现此操作符为 refCount

还存在一个 share 操作符,它等同于将 publishrefCount 操作符按顺序应用于 Observable。

refCount

RxJava 实现此操作符为 refCount

示例代码

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) { console.log('Side effect'); });

var published = source.publish().refCount();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed

refCount 存在于以下发行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (需要 rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

还存在一个 share 操作符,它等同于将 publishrefCount 操作符按顺序应用于 Observable。一个名为 shareValue 的变体将一个项目作为参数,它将在开始从源 Observable 发射项目之前将其发射给任何订阅者。

示例代码

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .do(
        function (x) { console.log('Side effect'); });

var published = source.share();

// When the number of observers subscribed to published observable goes from
// 0 to 1, we connect to the underlying observable sequence.
published.subscribe(createObserver('SourceA'));
// When the second subscriber is added, no additional subscriptions are added to the
// underlying observable sequence. As a result the operations that result in side
// effects are not repeated per subscriber.
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed

shareshareValue 存在于以下发行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (需要 rx.jsrx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

RxPHP 实现此操作符为 share

返回一个 Observable 序列,它共享对底层序列的单个订阅。此操作符是 publish 的特化,当观察者数量从零变为一时创建订阅,然后与所有后续观察者共享该订阅,直到观察者数量返回到零,此时订阅被处置。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/share.php

//With Share
$source = \Rx\Observable::interval(1000)
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->share();

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

   
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
    

RxPHP 还包含一个 singleInstance 操作符。

返回一个 Observable 序列,它共享对底层序列的单个订阅。即使所有先前的订阅都已结束,此 Observable 序列也可以重新订阅。此操作符的行为类似于 RxJS 5 中的 share()。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/singleInstance.php

$interval = Rx\Observable::interval(1000);

$source = $interval
    ->take(2)
    ->do(function () {
        echo 'Side effect', PHP_EOL;
    });

$single = $source->singleInstance();

// two simultaneous subscriptions, lasting 2 seconds
$single->subscribe($createStdoutObserver('SourceA '));
$single->subscribe($createStdoutObserver('SourceB '));

\Rx\Observable::timer(5000)->subscribe(function () use ($single, &$createStdoutObserver) {
    // resubscribe two times again, more than 5 seconds later,
    // long after the original two subscriptions have ended
    $single->subscribe($createStdoutObserver('SourceC '));
    $single->subscribe($createStdoutObserver('SourceD '));
});

   
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
Side effect
SourceC Next value: 0
SourceD Next value: 0
Side effect
SourceC Next value: 1
SourceD Next value: 1
SourceC Complete!
SourceD Complete!
    

RxPHP 还包含一个 shareValue 操作符。

返回一个 Observable 序列,它共享对底层序列的单个订阅,并以 initialValue 开头。此操作符是 publishValue 的特化,当观察者数量从零变为一时创建订阅,然后与所有后续观察者共享该订阅,直到观察者数量返回到零,此时订阅被处置。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareValue.php

$source = \Rx\Observable::interval(1000)
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->shareValue(42);

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

   
SourceA Next value: 42
SourceB Next value: 42
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
    

待定