编程语言有很多方法来获取计算结果,比如函数、future、操作、可调用对象、可运行对象等等。这些方法被归类在 Start 操作符类别下,让它们的行为类似于 Observable,以便它们可以在 Observable 级联中与其他 Observable 链式调用。
待定
待定
各种 RxGroovy 实现的 Start 位于可选的 rxjava-async
模块中。
rxjava-async
模块包含 start
操作符,它接受一个函数作为参数,调用该函数来检索一个值,然后返回一个 Observable,它将该值发射到每个后续观察者。
注意,即使多个观察者订阅了生成的 Observable,该函数也只执行一次。
rxjava-async
模块还包含 toAsync
、asyncAction
和 asyncFunc
操作符。这些操作符接受一个函数或一个 Action 作为参数。对于函数,这种变体操作符调用该函数来检索一个值,然后返回一个 Observable,它将该值发射到每个后续观察者(就像 start
操作符一样)。
对于 Action,过程类似,但没有返回值。在这种情况下,由该操作符创建的 Observable 将在终止之前发射一个 null
。
注意,即使多个观察者订阅了生成的 Observable,该函数或 Action 也只执行一次。
rxjava-async
模块还包含 startFuture
操作符。您将一个返回 Future
的函数传递给它。startFuture
立即调用该函数以获取 Future
,并调用 Future
的 get
方法以尝试获取其值。它返回一个 Observable,它将此值发射到任何后续观察者。
rxjava-async
模块还包含 deferFuture
操作符。您将一个返回一个返回 Observable 的 Future
的函数传递给它。deferFuture
返回一个 Observable,但在观察者订阅它返回的 Observable 之前不会调用您提供的函数。当它这样做时,它会立即调用结果 Future
上的 get
,然后将 Future
返回的 Observable 的发射作为它自己的发射镜像。
这样,您就可以在 Observable 级联中包含一个返回 Observable 的 Future
作为其他 Observable 的对等方。
rxjava-async
模块还包含 fromAction
操作符。它接受一个 Action
作为参数,并返回一个 Observable,该 Observable 在 Action
终止时发射您传递给 fromAction
的项。
rxjava-async
模块还包含 fromCallable
操作符。它接受一个 Callable
作为参数,并返回一个 Observable,该 Observable 将此可调用对象的返回值作为其唯一的发射。
rxjava-async
模块还包含 fromRunnable
操作符。它接受一个 Runnable
作为参数,并返回一个 Observable,该 Observable 在 Runnable
终止时发射您传递给 fromRunnable
的项。
rxjava-async
模块还包含 forEachFuture
操作符。它实际上不是 Start 操作符的变体,而是一个完全独立的东西。您将一些典型的观察者方法子集(onNext
、onError
和 onCompleted
)传递给 forEachFuture
,Observable 将按通常方式调用这些方法。但 forEachFuture
本身返回一个 Future
,它在 get
上阻塞,直到源 Observable 完成,然后返回完成或错误,具体取决于 Observable 的完成方式。
如果您需要一个阻塞直到 Observable 完成的函数,可以使用它。
rxjava-async
模块还包含 runAsync
操作符。它很奇怪,因为它创建了一个称为 StoppableObservable
的 Observable 特化版本。
将 runAsync
传递给一个 Action
和一个 Scheduler
,它将返回一个 StoppableObservable
,该 Observable 使用指定的 Action
生成它发射的项。Action
接受一个 Observer
和一个 Subscription
。它使用 Subscription
检查 unsubscribed
条件,如果条件满足,它将停止发射项。您还可以通过调用其 unsubscribe
方法随时手动停止 StoppableObservable
(这也会取消订阅您与 StoppableObservable
关联的 Subscription
)。
由于 runAsync
立即调用 Action
并开始发射项(即,它产生一个热 Observable),因此在您使用此操作符建立 StoppableObservable
和您的 Observer
准备好接收项之间的时间间隔内,可能会丢失一些项。如果这是一个问题,您可以使用 runAsync
的变体,它也接受一个 Subject
,并将一个 ReplaySubject
传递给它,您可以使用它来检索否则会丢失的项。
在 RxGroovy 中,还有一个 From 操作符的版本,它将 Future
转换为 Observable,并因此类似于 Start 操作符。
各种 RxJava 实现的 Start 位于可选的 rxjava-async
模块中。
rxjava-async
模块包含 start
操作符,它接受一个函数作为参数,调用该函数来检索一个值,然后返回一个 Observable,它将该值发射到每个后续观察者。
注意,即使多个观察者订阅了生成的 Observable,该函数也只执行一次。
rxjava-async
模块还包含 toAsync
、asyncAction
和 asyncFunc
操作符。这些操作符接受一个函数或一个 Action 作为参数。对于函数,这种变体操作符调用该函数来检索一个值,然后返回一个 Observable,它将该值发射到每个后续观察者(就像 start
操作符一样)。
对于 Action,过程类似,但没有返回值。在这种情况下,由该操作符创建的 Observable 将在终止之前发射一个 null
。
注意,即使多个观察者订阅了生成的 Observable,该函数或 Action 也只执行一次。
rxjava-async
模块还包含 startFuture
操作符。您将一个返回 Future
的函数传递给它。startFuture
立即调用该函数以获取 Future
,并调用 Future
的 get
方法以尝试获取其值。它返回一个 Observable,它将此值发射到任何后续观察者。
rxjava-async
模块还包含 deferFuture
操作符。您将一个返回一个返回 Observable 的 Future
的函数传递给它。deferFuture
返回一个 Observable,但在观察者订阅它返回的 Observable 之前不会调用您提供的函数。当它这样做时,它会立即调用结果 Future
上的 get
,然后将 Future
返回的 Observable 的发射作为它自己的发射镜像。
这样,您就可以在 Observable 级联中包含一个返回 Observable 的 Future
作为其他 Observable 的对等方。
rxjava-async
模块还包含 fromAction
操作符。它接受一个 Action
作为参数,并返回一个 Observable,该 Observable 在 Action
终止时发射您传递给 fromAction
的项。
rxjava-async
模块还包含 fromCallable
操作符。它接受一个 Callable
作为参数,并返回一个 Observable,该 Observable 将此可调用对象的返回值作为其唯一的发射。
rxjava-async
模块还包含 fromRunnable
操作符。它接受一个 Runnable
作为参数,并返回一个 Observable,该 Observable 在 Runnable
终止时发射您传递给 fromRunnable
的项。
rxjava-async
模块还包含 forEachFuture
操作符。它实际上不是 Start 操作符的变体,而是一个完全独立的东西。您将一些典型的观察者方法子集(onNext
、onError
和 onCompleted
)传递给 forEachFuture
,Observable 将按通常方式调用这些方法。但 forEachFuture
本身返回一个 Future
,它在 get
上阻塞,直到源 Observable 完成,然后返回完成或错误,具体取决于 Observable 的完成方式。
如果您需要一个阻塞直到 Observable 完成的函数,可以使用它。
rxjava-async
模块还包含 runAsync
操作符。它很奇怪,因为它创建了一个称为 StoppableObservable
的 Observable 特化版本。
将 runAsync
传递给一个 Action
和一个 Scheduler
,它将返回一个 StoppableObservable
,该 Observable 使用指定的 Action
生成它发射的项。Action
接受一个 Observer
和一个 Subscription
。它使用 Subscription
检查 unsubscribed
条件,如果条件满足,它将停止发射项。您还可以通过调用其 unsubscribe
方法随时手动停止 StoppableObservable
(这也会取消订阅您与 StoppableObservable
关联的 Subscription
)。
由于 runAsync
立即调用 Action
并开始发射项(即,它产生一个热 Observable),因此在您使用此操作符建立 StoppableObservable
和您的 Observer
准备好接收项之间的时间间隔内,可能会丢失一些项。如果这是一个问题,您可以使用 runAsync
的变体,它也接受一个 Subject
,并将一个 ReplaySubject
传递给它,您可以使用它来检索否则会丢失的项。
在 RxJava 中,还有一个 From 操作符的版本,它将 Future
转换为 Observable,并因此类似于 Start 操作符。
RxJS 实现 start
操作符。它将返回值将是结果 Observable 的发射的函数作为参数,以及可选地传递给该函数的任何其他参数和一个 Scheduler,在该 Scheduler 上运行该函数。
var context = { value: 42 }; var source = Rx.Observable.start( function () { return this.value; }, context, Rx.Scheduler.timeout ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
start
位于以下发行版中
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxJS 还实现 startAsync
操作符。它将返回值将是结果 Observable 的发射的异步函数作为参数。
您可以使用 toAsync
方法将函数转换为异步函数。它将函数、函数参数和 Scheduler 作为参数,并返回将在指定 Scheduler 上调用的异步函数。最后两个参数是可选的;如果您没有指定 Scheduler,则默认情况下将使用 timeout
Scheduler。
var source = Rx.Observable.startAsync(function () { return RSVP.Promise.resolve(42); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
startAsync
位于以下发行版中
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
toAsync
位于以下发行版中
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)RxPHP 将此操作符实现为 start
。
在指定调度器上异步调用指定的函数,通过可观察序列呈现结果。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/start/start.php $source = Rx\Observable::start(function () { return 42; }); $source->subscribe($stdoutObserver);
Next value: 42 Complete!
待定
待定