创建

通过函数从头开始创建可观察对象

Create

可以使用 Create 操作符从头开始创建可观察对象。将此操作符传递给一个函数,该函数将观察者作为其参数。编写此函数以使其表现得像可观察对象一样 - 通过适当地调用观察者的 onNextonErroronCompleted 方法。

一个格式良好的有限可观察对象必须尝试恰好调用一次观察者的 onCompleted 方法或恰好调用一次其 onError 方法,并且此后不得尝试调用观察者的任何其他方法。

另请参阅

特定于语言的信息

待定

create

RxGroovy 将此操作符实现为 create

示例代码

def myObservable = Observable.create({ aSubscriber ->
  try {
    for (int i = 1; i < 1000000; i++) {
      if (aSubscriber.isUnsubscribed()) {
        return;
      }
      aSubscriber.onNext(i);
    }
    if (!aSubscriber.isUnsubscribed()) {
      aSubscriber.onCompleted();
    }
  } catch(Throwable t) {
    if (!aSubscriber.isUnsubscribed()) {
      aSubscriber.onError(t);
    }
  }
})

最好检查观察者的 isUnsubscribed 状态,以便您的可观察对象在不再有感兴趣的观察者时停止发出项目或进行昂贵的计算。

create 默认情况下不会在任何特定的 调度器 上运行。

create

RxJava 将此操作符实现为 create

最好检查观察者的 isUnsubscribed 状态,以便您的可观察对象在不再有感兴趣的观察者时停止发出项目或进行昂贵的计算。

示例代码

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 1; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

create 默认情况下不会在任何特定的 调度器 上运行。

create

RxJS 将此操作符实现为 create(该操作符还有一个替代名称:createWithDisposable)。

示例代码

/* Using a function */
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
    return function () { console.log('disposed'); };
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 42
Completed
/* Using a disposable */
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
    return Rx.Disposable.create(function () {
        console.log('disposed');
    });
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 42
Completed

create 存在于以下发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
generate

可以使用 generate 操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generate 的基本形式采用四个参数

  1. 第一个要发射的项目
  2. 一个函数,用于测试一个项目以确定是否要发射它(true)或终止可观察对象(false
  3. 一个函数,用于根据先前项目的 value 生成要测试和发射的下一个项目
  4. 一个函数,用于在发射之前转换项目

还可以将 调度器 作为可选的第五个参数传递,generate 将使用它来创建和发出其序列(默认情况下使用 currentThread)。

示例代码

var source = Rx.Observable.generate(
    0,
    function (x) { return x < 3; },
    function (x) { return x + 1; },
    function (x) { return x; }
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed

generate 存在于以下发行版中

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
generateWithRelativeTime

可以使用 generateWithRelativeTime 操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generateWithRelativeTime 的基本形式采用五个参数

  1. 第一个要发射的项目
  2. 一个函数,用于测试一个项目以确定是否要发射它(true)或终止可观察对象(false
  3. 一个函数,用于根据先前项目的 value 生成要测试和发射的下一个项目
  4. 一个函数,用于在发射之前转换项目
  5. 一个函数,指示在发射先前项目后,生成器应该等待多长时间(以毫秒为单位),然后再发射此项目

还可以将 调度器 作为可选的第六个参数传递,generate 将使用它来创建和发出其序列(默认情况下使用 currentThread)。

示例代码

var source = Rx.Observable.generateWithRelativeTime(
    1,
    function (x) { return x < 4; },
    function (x) { return x + 1; },
    function (x) { return x; },
    function (x) { return 100 * x; }
).timeInterval();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: {value: 1, interval: 100}
Next: {value: 2, interval: 200}
Next: {value: 3, interval: 300}
Completed

generateWithRelativeTime 存在于以下发行版中

  • rx.lite.js
  • rx.lite.compat.js
  • rx.time.js(需要 rx.jsrx.compat.js
generateWithAbsoluteTime

可以使用 generateWithAbsoluteTime 操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generateWithAbsoluteTime 的基本形式采用五个参数

  1. 第一个要发射的项目
  2. 一个函数,用于测试一个项目以确定是否要发射它(true)或终止可观察对象(false
  3. 一个函数,用于根据先前项目的 value 生成要测试和发射的下一个项目
  4. 一个函数,用于在发射之前转换项目
  5. 一个函数,指示生成器应该在什么时间(表示为 Date)发射新项目

还可以将 调度器 作为可选的第六个参数传递,generate 将使用它来创建和发出其序列(默认情况下使用 currentThread)。

示例代码

var source = Rx.Observable.generate(
    1,
    function (x) { return x < 4; },
    function (x) { return x + 1; },
    function (x) { return x; },
    function (x) { return Date.now() + (100 * x); }
).timeInterval();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: {value: 1, interval: 100}
Next: {value: 2, interval: 200}
Next: {value: 3, interval: 300}
Completed

generateWithAbsoluteTime 存在于以下发行版中

  • rx.time.js(需要 rx.jsrx.compat.js

RxPHP 将此操作符实现为 create

从指定的 subscribeAction 可调用实现创建可观察对象序列。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/create/create.php

//With static method
$source = \Rx\Observable::create(function (\Rx\ObserverInterface $observer) {
    $observer->onNext(42);
    $observer->onCompleted();

    return new CallbackDisposable(function () {
        echo "Disposed\n";
    });
});

$subscription = $source->subscribe($createStdoutObserver());

   
Next value: 42
Complete!
Disposed
    
create

RxSwift 将此操作符实现为 create

示例代码

let source : Observable = Observable.create { observer in
    for i in 1...5 {
        observer.on(.next(i))
    }
    observer.on(.completed)

    // Note that this is optional. If you require no cleanup you can return
    // `Disposables.create()` (which returns the `NopDisposable` singleton)
    return Disposables.create {
        print("disposed")
    }
}

source.subscribe {
    print($0)
}
next(1)
next(2)
next(3)
next(4)
next(5)
completed
disposed
generate

可以使用 generate 操作符创建简单的可观察对象,这些对象可以生成其下一个发射,并且可以根据先前发射的值确定何时终止。generate 的基本形式采用三个参数

  1. 第一个要发射的项目
  2. 一个函数,用于测试一个项目以确定是否要发射它(true)或终止可观察对象(false
  3. 一个函数,用于根据先前项目的 value 生成要测试和发射的下一个项目

还可以将 调度器 作为可选的第四个参数传递,generate 将使用它来创建和发出其序列(默认情况下使用 CurrentThreadScheduler)。

示例代码

let source = Observable.generate(
   initialState: 0,
   condition: { $0 < 3 },
   iterate: { $0 + 1 }
)

source.subscribe {
   print($0)
}
next(0)
next(1)
next(2)
completed