Publish

将一个普通的 Observable 转换为一个可连接的 Observable

Publish

一个 可连接的 Observable 类似于一个普通的 Observable,除了它不会在被订阅时开始发出项目,而是在 Connect 操作符应用于它时才开始发出项目。通过这种方式,您可以选择何时让 Observable 开始发出项目。

另见

特定语言信息

待定

publish

RxGroovy 将此操作符实现为 publish

publish

还有一个变体接受一个函数作为参数。这个函数将来自源 Observable 的已发出项目作为参数,并生成将在结果 Observable 中发出该项目的替代项。

publish

RxJava 将此操作符实现为 publish

publish

还有一个变体接受一个函数作为参数。这个函数将 ConnectableObservable 作为参数,该 ConnectableObservable 共享对基础 Observable 序列的单个订阅。这个函数生成并返回一个新的 Observable 序列。

publish

在 RxJS 中,publish 操作符接受一个函数作为参数。这个函数将来自源 Observable 的已发出项目作为参数,并生成将在返回的 ConnectableObservable 中发出的该项目的替代项。

示例代码

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publish();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
publishValue

publishValue 操作符除了上面描述的函数之外,还会接收一个初始项目,该项目将在连接时由结果 ConnectableObservable 发出,然后发出来自源 Observable 的项目。但是,它不会将此初始项目发出给在连接时间之后订阅的观察者。

示例代码

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publishValue(42);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Next: SourceA42
Next: SourceB42
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed
publishLast

publishLast 操作符类似于 publish,并接受一个类似行为的函数作为其参数。它与 publish 的不同之处在于,它不是将该函数应用于源 Observable 在连接之后发出的每个项目,并为每个项目发出一个项目,而是仅将该函数应用于源 Observable 终止时正常发出的最后一个项目,并为该项目发出一个项目。

示例代码

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publishLast();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Side effect
Next: SourceA1
Completed
Next: SourceB1
Completed

以上操作符在以下包中可用

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js(需要 rx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxJS 还具有一个 multicast 操作符,它对一个普通的 Observable 进行操作,通过您指定的特定 Subject 对该 Observable 进行多播,将转换函数应用于每次发出,然后将这些转换后的值作为其自身的普通 Observable 序列发出。对这个新 Observable 的每次订阅都将触发对基础多播 Observable 的新订阅。

示例代码

var subject = new Rx.Subject();
var source = Rx.Observable.range(0, 3)
    .multicast(subject);

var observer = Rx.Observer.create(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); }
);

var subscription = source.subscribe(observer);
subject.subscribe(observer);

var connected = source.connect();

subscription.dispose();
Next: 0
Next: 0
Next: 1
Next: 1
Next: 2
Next: 2
Completed

multicast 操作符在以下包中可用

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js(需要 rx.lite.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

还有一个 let 操作符(别名 letBind 可用于 IE9 之前的 Internet Explorer 等浏览器,其中“let”是被禁止的)。它类似于 multicast,但不会通过 Subject 对基础 Observable 进行多播

示例代码

var obs = Rx.Observable.range(1, 3);

var source = obs.let(function (o) { return o.concat(o); });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 3
Next: 1
Next: 2
Next: 3
Completed

let(或 letBind)操作符在以下包中可用

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js

它需要以下包之一

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP 将此操作符实现为 multicast

通过一个实例化的 Subject 将源序列通知多播到选择器函数中序列的所有使用中。对结果序列的每次订阅都会导致一个单独的多播调用,公开选择器函数调用产生的序列。对于具有固定主题类型的专业化,请参阅 Publish、PublishLast 和 Replay。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/multicast/multicast.php

$subject = new \Rx\Subject\Subject();
$source  = \Rx\Observable::range(0, 3)->multicast($subject);

$subscription = $source->subscribe($stdoutObserver);
$subject->subscribe($stdoutObserver);

$connected = $source->connect();

   
Next value: 0
Next value: 0
Next value: 1
Next value: 1
Next value: 2
Next value: 2
Complete!
    

RxPHP 还具有一个 multicastWithSelector 操作符。

通过一个来自主题选择器工厂的实例化的 Subject 将源序列通知多播到选择器函数中序列的所有使用中。对结果序列的每次订阅都会导致一个单独的多播调用,公开选择器函数调用产生的序列。对于具有固定主题类型的专业化,请参阅 Publish、PublishLast 和 Replay。

RxPHP 还具有一个 publish 操作符。

返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果。此操作符是使用常规 Subject 的 Multicast 的专业化。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publish.php

/* With publish */
$interval = \Rx\Observable::range(0, 10);

$source = $interval
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publish();

$published->subscribe($createStdoutObserver('SourceC '));
$published->subscribe($createStdoutObserver('SourceD '));

$published->connect();

   
Side effect
SourceC Next value: 0
SourceD Next value: 0
Side effect
SourceC Next value: 1
SourceD Next value: 1
SourceC Complete!
SourceD Complete!
    

RxPHP 还具有一个 publishLast 操作符。

返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果,其中只包含最后一个通知。此操作符是使用 AsyncSubject 的 Multicast 的专业化。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishLast.php

$range = \Rx\Observable::fromArray(range(0, 1000));

$source = $range
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publishLast();

$published->subscribe($createStdoutObserver('SourceA'));
$published->subscribe($createStdoutObserver('SourceB'));

$connection = $published->connect();

   
Side effect
Side effect
SourceANext value: 1
SourceBNext value: 1
SourceAComplete!
SourceBComplete!
    

RxPHP 还具有一个 publishValue 操作符。

返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果,并且从 initialValue 开始。此操作符是使用 BehaviorSubject 的 Multicast 的专业化。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishValue.php

$range = \Rx\Observable::fromArray(range(0, 1000));

$source = $range
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publishValue(42);

$published->subscribe($createStdoutObserver('SourceA'));
$published->subscribe($createStdoutObserver('SourceB'));

$connection = $published->connect();

   
SourceANext value: 42
SourceBNext value: 42
Side effect
SourceANext value: 0
SourceBNext value: 0
Side effect
SourceANext value: 1
SourceBNext value: 1
SourceAComplete!
SourceBComplete!
    

待定