可以使用 Create 操作符从头开始创建可观察对象。将此操作符传递给一个函数,该函数将观察者作为其参数。编写此函数以使其表现得像可观察对象一样 - 通过适当地调用观察者的 onNext、onError 和 onCompleted 方法。
一个格式良好的有限可观察对象必须尝试恰好调用一次观察者的 onCompleted 方法或恰好调用一次其 onError 方法,并且此后不得尝试调用观察者的任何其他方法。
待定
RxGroovy 将此操作符实现为 create。
def myObservable = Observable.create({ aSubscriber ->
try {
for (int i = 1; i < 1000000; i++) {
if (aSubscriber.isUnsubscribed()) {
return;
}
aSubscriber.onNext(i);
}
if (!aSubscriber.isUnsubscribed()) {
aSubscriber.onCompleted();
}
} catch(Throwable t) {
if (!aSubscriber.isUnsubscribed()) {
aSubscriber.onError(t);
}
}
})最好检查观察者的 isUnsubscribed 状态,以便您的可观察对象在不再有感兴趣的观察者时停止发出项目或进行昂贵的计算。
create 默认情况下不会在任何特定的 调度器 上运行。
create(OnSubscribe)
RxJava 将此操作符实现为 create。
最好检查观察者的 isUnsubscribed 状态,以便您的可观察对象在不再有感兴趣的观察者时停止发出项目或进行昂贵的计算。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
create 默认情况下不会在任何特定的 调度器 上运行。
create(OnSubscribe)
RxJS 将此操作符实现为 create(该操作符还有一个替代名称:createWithDisposable)。
/* Using a function */
var source = Rx.Observable.create(function (observer) {
observer.onNext(42);
observer.onCompleted();
// Note that this is optional, you do not have to return this if you require no cleanup
return function () { console.log('disposed'); };
});
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: 42 Completed
/* Using a disposable */
var source = Rx.Observable.create(function (observer) {
observer.onNext(42);
observer.onCompleted();
// Note that this is optional, you do not have to return this if you require no cleanup
return Rx.Disposable.create(function () {
console.log('disposed');
});
});
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: 42 Completed
create 存在于以下发行版中
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
可以使用 generate 操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generate 的基本形式采用四个参数
true)或终止可观察对象(false)还可以将 调度器 作为可选的第五个参数传递,generate 将使用它来创建和发出其序列(默认情况下使用 currentThread)。
var source = Rx.Observable.generate(
0,
function (x) { return x < 3; },
function (x) { return x + 1; },
function (x) { return x; }
);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: 0 Next: 1 Next: 2 Completed
generate 存在于以下发行版中
rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
可以使用 generateWithRelativeTime 操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generateWithRelativeTime 的基本形式采用五个参数
true)或终止可观察对象(false)还可以将 调度器 作为可选的第六个参数传递,generate 将使用它来创建和发出其序列(默认情况下使用 currentThread)。
var source = Rx.Observable.generateWithRelativeTime(
1,
function (x) { return x < 4; },
function (x) { return x + 1; },
function (x) { return x; },
function (x) { return 100 * x; }
).timeInterval();
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: {value: 1, interval: 100}
Next: {value: 2, interval: 200}
Next: {value: 3, interval: 300}
Completed
generateWithRelativeTime 存在于以下发行版中
rx.lite.jsrx.lite.compat.jsrx.time.js(需要 rx.js 或 rx.compat.js)
可以使用 generateWithAbsoluteTime 操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generateWithAbsoluteTime 的基本形式采用五个参数
true)或终止可观察对象(false)Date)发射新项目还可以将 调度器 作为可选的第六个参数传递,generate 将使用它来创建和发出其序列(默认情况下使用 currentThread)。
var source = Rx.Observable.generate(
1,
function (x) { return x < 4; },
function (x) { return x + 1; },
function (x) { return x; },
function (x) { return Date.now() + (100 * x); }
).timeInterval();
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: {value: 1, interval: 100}
Next: {value: 2, interval: 200}
Next: {value: 3, interval: 300}
Completed
generateWithAbsoluteTime 存在于以下发行版中
rx.time.js(需要 rx.js 或 rx.compat.js)RxPHP 将此操作符实现为 create。
从指定的 subscribeAction 可调用实现创建可观察对象序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/create/create.php
//With static method
$source = \Rx\Observable::create(function (\Rx\ObserverInterface $observer) {
$observer->onNext(42);
$observer->onCompleted();
return new CallbackDisposable(function () {
echo "Disposed\n";
});
});
$subscription = $source->subscribe($createStdoutObserver());
Next value: 42
Complete!
Disposed
RxSwift 将此操作符实现为 create。
let source : Observable= Observable.create { observer in for i in 1...5 { observer.on(.next(i)) } observer.on(.completed) // Note that this is optional. If you require no cleanup you can return // `Disposables.create()` (which returns the `NopDisposable` singleton) return Disposables.create { print("disposed") } } source.subscribe { print($0) }
next(1) next(2) next(3) next(4) next(5) completed disposed
可以使用 generate 操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generate 的基本形式采用三个参数
true)或终止可观察对象(false)还可以将 调度器 作为可选的第四个参数传递,generate 将使用它来创建和发出其序列(默认情况下使用 CurrentThreadScheduler)。
let source = Observable.generate( initialState: 0, condition: { $0 < 3 }, iterate: { $0 + 1 } ) source.subscribe { print($0) }
next(0) next(1) next(2) completed