一个 可连接的 Observable 类似于一个普通的 Observable,除了它不会在被订阅时开始发出项目,而是在 Connect 操作符应用于它时才开始发出项目。通过这种方式,您可以选择何时让 Observable 开始发出项目。
待定
RxGroovy 将此操作符实现为 publish。
publish()
还有一个变体接受一个函数作为参数。这个函数将来自源 Observable 的已发出项目作为参数,并生成将在结果 Observable 中发出该项目的替代项。
publish(Func1)
RxJava 将此操作符实现为 publish。
publish()
还有一个变体接受一个函数作为参数。这个函数将 ConnectableObservable 作为参数,该 ConnectableObservable 共享对基础 Observable 序列的单个订阅。这个函数生成并返回一个新的 Observable 序列。
publish(Func1)
在 RxJS 中,publish 操作符接受一个函数作为参数。这个函数将来自源 Observable 的已发出项目作为参数,并生成将在返回的 ConnectableObservable 中发出的该项目的替代项。
var interval = Rx.Observable.interval(1000);
var source = interval
.take(2)
.doAction(function (x) {
console.log('Side effect');
});
var published = source.publish();
published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));
var connection = published.connect();
function createObserver(tag) {
return Rx.Observer.create(
function (x) { console.log('Next: ' + tag + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
}Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed
publishValue 操作符除了上面描述的函数之外,还会接收一个初始项目,该项目将在连接时由结果 ConnectableObservable 发出,然后发出来自源 Observable 的项目。但是,它不会将此初始项目发出给在连接时间之后订阅的观察者。
var interval = Rx.Observable.interval(1000);
var source = interval
.take(2)
.doAction(function (x) {
console.log('Side effect');
});
var published = source.publishValue(42);
published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));
var connection = published.connect();
function createObserver(tag) {
return Rx.Observer.create(
function (x) { console.log('Next: ' + tag + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
}Next: SourceA42 Next: SourceB42 Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed Completed
publishLast 操作符类似于 publish,并接受一个类似行为的函数作为其参数。它与 publish 的不同之处在于,它不是将该函数应用于源 Observable 在连接之后发出的每个项目,并为每个项目发出一个项目,而是仅将该函数应用于源 Observable 终止时正常发出的最后一个项目,并为该项目发出一个项目。
var interval = Rx.Observable.interval(1000);
var source = interval
.take(2)
.doAction(function (x) {
console.log('Side effect');
});
var published = source.publishLast();
published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));
var connection = published.connect();
function createObserver(tag) {
return Rx.Observer.create(
function (x) { console.log('Next: ' + tag + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
}Side effect Side effect Next: SourceA1 Completed Next: SourceB1 Completed
以上操作符在以下包中可用
rx.all.jsrx.all.compat.jsrx.binding.js(需要 rx.js 或 rx.compat.js)rx.lite.jsrx.lite.compat.jsRxJS 还具有一个 multicast 操作符,它对一个普通的 Observable 进行操作,通过您指定的特定 Subject 对该 Observable 进行多播,将转换函数应用于每次发出,然后将这些转换后的值作为其自身的普通 Observable 序列发出。对这个新 Observable 的每次订阅都将触发对基础多播 Observable 的新订阅。
var subject = new Rx.Subject();
var source = Rx.Observable.range(0, 3)
.multicast(subject);
var observer = Rx.Observer.create(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }
);
var subscription = source.subscribe(observer);
subject.subscribe(observer);
var connected = source.connect();
subscription.dispose();Next: 0 Next: 0 Next: 1 Next: 1 Next: 2 Next: 2 Completed
multicast 操作符在以下包中可用
rx.all.jsrx.all.compat.jsrx.binding.js(需要 rx.lite.js 或 rx.compat.js)rx.lite.jsrx.lite.compat.js还有一个 let 操作符(别名 letBind 可用于 IE9 之前的 Internet Explorer 等浏览器,其中“let”是被禁止的)。它类似于 multicast,但不会通过 Subject 对基础 Observable 进行多播
var obs = Rx.Observable.range(1, 3);
var source = obs.let(function (o) { return o.concat(o); });
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: 1 Next: 2 Next: 3 Next: 1 Next: 2 Next: 3 Completed
let(或 letBind)操作符在以下包中可用
rx.all.jsrx.all.compat.jsrx.experimental.js它需要以下包之一
rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.jsRxPHP 将此操作符实现为 multicast。
通过一个实例化的 Subject 将源序列通知多播到选择器函数中序列的所有使用中。对结果序列的每次订阅都会导致一个单独的多播调用,公开选择器函数调用产生的序列。对于具有固定主题类型的专业化,请参阅 Publish、PublishLast 和 Replay。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/multicast/multicast.php $subject = new \Rx\Subject\Subject(); $source = \Rx\Observable::range(0, 3)->multicast($subject); $subscription = $source->subscribe($stdoutObserver); $subject->subscribe($stdoutObserver); $connected = $source->connect();
Next value: 0
Next value: 0
Next value: 1
Next value: 1
Next value: 2
Next value: 2
Complete!
RxPHP 还具有一个 multicastWithSelector 操作符。
通过一个来自主题选择器工厂的实例化的 Subject 将源序列通知多播到选择器函数中序列的所有使用中。对结果序列的每次订阅都会导致一个单独的多播调用,公开选择器函数调用产生的序列。对于具有固定主题类型的专业化,请参阅 Publish、PublishLast 和 Replay。
RxPHP 还具有一个 publish 操作符。
返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果。此操作符是使用常规 Subject 的 Multicast 的专业化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publish.php
/* With publish */
$interval = \Rx\Observable::range(0, 10);
$source = $interval
->take(2)
->doOnNext(function ($x) {
echo "Side effect\n";
});
$published = $source->publish();
$published->subscribe($createStdoutObserver('SourceC '));
$published->subscribe($createStdoutObserver('SourceD '));
$published->connect();
Side effect
SourceC Next value: 0
SourceD Next value: 0
Side effect
SourceC Next value: 1
SourceD Next value: 1
SourceC Complete!
SourceD Complete!
RxPHP 还具有一个 publishLast 操作符。
返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果,其中只包含最后一个通知。此操作符是使用 AsyncSubject 的 Multicast 的专业化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishLast.php
$range = \Rx\Observable::fromArray(range(0, 1000));
$source = $range
->take(2)
->doOnNext(function ($x) {
echo "Side effect\n";
});
$published = $source->publishLast();
$published->subscribe($createStdoutObserver('SourceA'));
$published->subscribe($createStdoutObserver('SourceB'));
$connection = $published->connect();
Side effect
Side effect
SourceANext value: 1
SourceBNext value: 1
SourceAComplete!
SourceBComplete!
RxPHP 还具有一个 publishValue 操作符。
返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果,并且从 initialValue 开始。此操作符是使用 BehaviorSubject 的 Multicast 的专业化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishValue.php
$range = \Rx\Observable::fromArray(range(0, 1000));
$source = $range
->take(2)
->doOnNext(function ($x) {
echo "Side effect\n";
});
$published = $source->publishValue(42);
$published->subscribe($createStdoutObserver('SourceA'));
$published->subscribe($createStdoutObserver('SourceB'));
$connection = $published->connect();
SourceANext value: 42
SourceBNext value: 42
Side effect
SourceANext value: 0
SourceBNext value: 0
Side effect
SourceANext value: 1
SourceBNext value: 1
SourceAComplete!
SourceBComplete!
待定