一个 可连接的 Observable 类似于一个普通的 Observable,除了它不会在被订阅时开始发出项目,而是在 Connect 操作符应用于它时才开始发出项目。通过这种方式,您可以选择何时让 Observable 开始发出项目。
待定
RxGroovy 将此操作符实现为 publish
。
publish()
还有一个变体接受一个函数作为参数。这个函数将来自源 Observable 的已发出项目作为参数,并生成将在结果 Observable 中发出该项目的替代项。
publish(Func1)
RxJava 将此操作符实现为 publish
。
publish()
还有一个变体接受一个函数作为参数。这个函数将 ConnectableObservable
作为参数,该 ConnectableObservable
共享对基础 Observable 序列的单个订阅。这个函数生成并返回一个新的 Observable 序列。
publish(Func1)
在 RxJS 中,publish
操作符接受一个函数作为参数。这个函数将来自源 Observable 的已发出项目作为参数,并生成将在返回的 ConnectableObservable
中发出的该项目的替代项。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publish(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed
publishValue
操作符除了上面描述的函数之外,还会接收一个初始项目,该项目将在连接时由结果 ConnectableObservable
发出,然后发出来自源 Observable 的项目。但是,它不会将此初始项目发出给在连接时间之后订阅的观察者。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publishValue(42); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Next: SourceA42 Next: SourceB42 Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed Completed
publishLast
操作符类似于 publish
,并接受一个类似行为的函数作为其参数。它与 publish
的不同之处在于,它不是将该函数应用于源 Observable 在连接之后发出的每个项目,并为每个项目发出一个项目,而是仅将该函数应用于源 Observable 终止时正常发出的最后一个项目,并为该项目发出一个项目。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publishLast(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Side effect Next: SourceA1 Completed Next: SourceB1 Completed
以上操作符在以下包中可用
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxJS 还具有一个 multicast
操作符,它对一个普通的 Observable 进行操作,通过您指定的特定 Subject 对该 Observable 进行多播,将转换函数应用于每次发出,然后将这些转换后的值作为其自身的普通 Observable 序列发出。对这个新 Observable 的每次订阅都将触发对基础多播 Observable 的新订阅。
var subject = new Rx.Subject(); var source = Rx.Observable.range(0, 3) .multicast(subject); var observer = Rx.Observer.create( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); } ); var subscription = source.subscribe(observer); subject.subscribe(observer); var connected = source.connect(); subscription.dispose();
Next: 0 Next: 0 Next: 1 Next: 1 Next: 2 Next: 2 Completed
multicast
操作符在以下包中可用
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.lite.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
还有一个 let
操作符(别名 letBind
可用于 IE9 之前的 Internet Explorer 等浏览器,其中“let
”是被禁止的)。它类似于 multicast
,但不会通过 Subject 对基础 Observable 进行多播
var obs = Rx.Observable.range(1, 3); var source = obs.let(function (o) { return o.concat(o); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Next: 1 Next: 2 Next: 3 Completed
let
(或 letBind
)操作符在以下包中可用
rx.all.js
rx.all.compat.js
rx.experimental.js
它需要以下包之一
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxPHP 将此操作符实现为 multicast
。
通过一个实例化的 Subject 将源序列通知多播到选择器函数中序列的所有使用中。对结果序列的每次订阅都会导致一个单独的多播调用,公开选择器函数调用产生的序列。对于具有固定主题类型的专业化,请参阅 Publish、PublishLast 和 Replay。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/multicast/multicast.php $subject = new \Rx\Subject\Subject(); $source = \Rx\Observable::range(0, 3)->multicast($subject); $subscription = $source->subscribe($stdoutObserver); $subject->subscribe($stdoutObserver); $connected = $source->connect();
Next value: 0 Next value: 0 Next value: 1 Next value: 1 Next value: 2 Next value: 2 Complete!
RxPHP 还具有一个 multicastWithSelector
操作符。
通过一个来自主题选择器工厂的实例化的 Subject 将源序列通知多播到选择器函数中序列的所有使用中。对结果序列的每次订阅都会导致一个单独的多播调用,公开选择器函数调用产生的序列。对于具有固定主题类型的专业化,请参阅 Publish、PublishLast 和 Replay。
RxPHP 还具有一个 publish
操作符。
返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果。此操作符是使用常规 Subject 的 Multicast 的专业化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publish.php /* With publish */ $interval = \Rx\Observable::range(0, 10); $source = $interval ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publish(); $published->subscribe($createStdoutObserver('SourceC ')); $published->subscribe($createStdoutObserver('SourceD ')); $published->connect();
Side effect SourceC Next value: 0 SourceD Next value: 0 Side effect SourceC Next value: 1 SourceD Next value: 1 SourceC Complete! SourceD Complete!
RxPHP 还具有一个 publishLast
操作符。
返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果,其中只包含最后一个通知。此操作符是使用 AsyncSubject 的 Multicast 的专业化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishLast.php $range = \Rx\Observable::fromArray(range(0, 1000)); $source = $range ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publishLast(); $published->subscribe($createStdoutObserver('SourceA')); $published->subscribe($createStdoutObserver('SourceB')); $connection = $published->connect();
Side effect Side effect SourceANext value: 1 SourceBNext value: 1 SourceAComplete! SourceBComplete!
RxPHP 还具有一个 publishValue
操作符。
返回一个 Observable 序列,该序列是通过对共享对基础序列的单个订阅的可连接 Observable 序列调用选择器函数的结果,并且从 initialValue 开始。此操作符是使用 BehaviorSubject 的 Multicast 的专业化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishValue.php $range = \Rx\Observable::fromArray(range(0, 1000)); $source = $range ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publishValue(42); $published->subscribe($createStdoutObserver('SourceA')); $published->subscribe($createStdoutObserver('SourceB')); $connection = $published->connect();
SourceANext value: 42 SourceBNext value: 42 Side effect SourceANext value: 0 SourceBNext value: 0 Side effect SourceANext value: 1 SourceBNext value: 1 SourceAComplete! SourceBComplete!
待定