可以使用 Create 操作符从头开始创建可观察对象。将此操作符传递给一个函数,该函数将观察者作为其参数。编写此函数以使其表现得像可观察对象一样 - 通过适当地调用观察者的 onNext
、onError
和 onCompleted
方法。
一个格式良好的有限可观察对象必须尝试恰好调用一次观察者的 onCompleted
方法或恰好调用一次其 onError
方法,并且此后不得尝试调用观察者的任何其他方法。
待定
RxGroovy 将此操作符实现为 create
。
def myObservable = Observable.create({ aSubscriber -> try { for (int i = 1; i < 1000000; i++) { if (aSubscriber.isUnsubscribed()) { return; } aSubscriber.onNext(i); } if (!aSubscriber.isUnsubscribed()) { aSubscriber.onCompleted(); } } catch(Throwable t) { if (!aSubscriber.isUnsubscribed()) { aSubscriber.onError(t); } } })
最好检查观察者的 isUnsubscribed
状态,以便您的可观察对象在不再有感兴趣的观察者时停止发出项目或进行昂贵的计算。
create
默认情况下不会在任何特定的 调度器 上运行。
create(OnSubscribe)
RxJava 将此操作符实现为 create
。
最好检查观察者的 isUnsubscribed
状态,以便您的可观察对象在不再有感兴趣的观察者时停止发出项目或进行昂贵的计算。
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
create
默认情况下不会在任何特定的 调度器 上运行。
create(OnSubscribe)
RxJS 将此操作符实现为 create
(该操作符还有一个替代名称:createWithDisposable
)。
/* Using a function */ var source = Rx.Observable.create(function (observer) { observer.onNext(42); observer.onCompleted(); // Note that this is optional, you do not have to return this if you require no cleanup return function () { console.log('disposed'); }; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
/* Using a disposable */ var source = Rx.Observable.create(function (observer) { observer.onNext(42); observer.onCompleted(); // Note that this is optional, you do not have to return this if you require no cleanup return Rx.Disposable.create(function () { console.log('disposed'); }); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
create
存在于以下发行版中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
可以使用 generate
操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generate
的基本形式采用四个参数
true
)或终止可观察对象(false
)还可以将 调度器 作为可选的第五个参数传递,generate
将使用它来创建和发出其序列(默认情况下使用 currentThread
)。
var source = Rx.Observable.generate( 0, function (x) { return x < 3; }, function (x) { return x + 1; }, function (x) { return x; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Completed
generate
存在于以下发行版中
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
可以使用 generateWithRelativeTime
操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generateWithRelativeTime
的基本形式采用五个参数
true
)或终止可观察对象(false
)还可以将 调度器 作为可选的第六个参数传递,generate
将使用它来创建和发出其序列(默认情况下使用 currentThread
)。
var source = Rx.Observable.generateWithRelativeTime( 1, function (x) { return x < 4; }, function (x) { return x + 1; }, function (x) { return x; }, function (x) { return 100 * x; } ).timeInterval(); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: {value: 1, interval: 100} Next: {value: 2, interval: 200} Next: {value: 3, interval: 300} Completed
generateWithRelativeTime
存在于以下发行版中
rx.lite.js
rx.lite.compat.js
rx.time.js
(需要 rx.js
或 rx.compat.js
)可以使用 generateWithAbsoluteTime
操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generateWithAbsoluteTime
的基本形式采用五个参数
true
)或终止可观察对象(false
)Date
)发射新项目还可以将 调度器 作为可选的第六个参数传递,generate
将使用它来创建和发出其序列(默认情况下使用 currentThread
)。
var source = Rx.Observable.generate( 1, function (x) { return x < 4; }, function (x) { return x + 1; }, function (x) { return x; }, function (x) { return Date.now() + (100 * x); } ).timeInterval(); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: {value: 1, interval: 100} Next: {value: 2, interval: 200} Next: {value: 3, interval: 300} Completed
generateWithAbsoluteTime
存在于以下发行版中
rx.time.js
(需要 rx.js
或 rx.compat.js
)RxPHP 将此操作符实现为 create
。
从指定的 subscribeAction 可调用实现创建可观察对象序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/create/create.php //With static method $source = \Rx\Observable::create(function (\Rx\ObserverInterface $observer) { $observer->onNext(42); $observer->onCompleted(); return new CallbackDisposable(function () { echo "Disposed\n"; }); }); $subscription = $source->subscribe($createStdoutObserver());
Next value: 42 Complete! Disposed
RxSwift 将此操作符实现为 create
。
let source : Observable= Observable.create { observer in for i in 1...5 { observer.on(.next(i)) } observer.on(.completed) // Note that this is optional. If you require no cleanup you can return // `Disposables.create()` (which returns the `NopDisposable` singleton) return Disposables.create { print("disposed") } } source.subscribe { print($0) }
next(1) next(2) next(3) next(4) next(5) completed disposed
可以使用 generate
操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generate
的基本形式采用三个参数
true
)或终止可观察对象(false
)还可以将 调度器 作为可选的第四个参数传递,generate
将使用它来创建和发出其序列(默认情况下使用 CurrentThreadScheduler
)。
let source = Observable.generate( initialState: 0, condition: { $0 < 3 }, iterate: { $0 + 1 } ) source.subscribe { print($0) }
next(0) next(1) next(2) completed