Start

创建发射函数式指令返回值的 Observable

Start

编程语言有很多方法来获取计算结果,比如函数、future、操作、可调用对象、可运行对象等等。这些方法被归类在 Start 操作符类别下,让它们的行为类似于 Observable,以便它们可以在 Observable 级联中与其他 Observable 链式调用。

另请参阅

语言特定信息

待定

待定

各种 RxGroovy 实现的 Start 位于可选的 rxjava-async 模块中。

start

rxjava-async 模块包含 start 操作符,它接受一个函数作为参数,调用该函数来检索一个值,然后返回一个 Observable,它将该值发射到每个后续观察者。

注意,即使多个观察者订阅了生成的 Observable,该函数也只执行一次。

toAsync

rxjava-async 模块还包含 toAsyncasyncActionasyncFunc 操作符。这些操作符接受一个函数或一个 Action 作为参数。对于函数,这种变体操作符调用该函数来检索一个值,然后返回一个 Observable,它将该值发射到每个后续观察者(就像 start 操作符一样)。

对于 Action,过程类似,但没有返回值。在这种情况下,由该操作符创建的 Observable 将在终止之前发射一个 null

注意,即使多个观察者订阅了生成的 Observable,该函数或 Action 也只执行一次。

startFuture

rxjava-async 模块还包含 startFuture 操作符。您将一个返回 Future 的函数传递给它。startFuture 立即调用该函数以获取 Future,并调用 Futureget 方法以尝试获取其值。它返回一个 Observable,它将此值发射到任何后续观察者。

deferFuture

rxjava-async 模块还包含 deferFuture 操作符。您将一个返回一个返回 Observable 的 Future 的函数传递给它。deferFuture 返回一个 Observable,但在观察者订阅它返回的 Observable 之前不会调用您提供的函数。当它这样做时,它会立即调用结果 Future 上的 get,然后将 Future 返回的 Observable 的发射作为它自己的发射镜像。

这样,您就可以在 Observable 级联中包含一个返回 Observable 的 Future 作为其他 Observable 的对等方。

fromAction

rxjava-async 模块还包含 fromAction 操作符。它接受一个 Action 作为参数,并返回一个 Observable,该 Observable 在 Action 终止时发射您传递给 fromAction 的项。

fromCallable

rxjava-async 模块还包含 fromCallable 操作符。它接受一个 Callable 作为参数,并返回一个 Observable,该 Observable 将此可调用对象的返回值作为其唯一的发射。

fromRunnable

rxjava-async 模块还包含 fromRunnable 操作符。它接受一个 Runnable 作为参数,并返回一个 Observable,该 Observable 在 Runnable 终止时发射您传递给 fromRunnable 的项。

forEachFuture

rxjava-async 模块还包含 forEachFuture 操作符。它实际上不是 Start 操作符的变体,而是一个完全独立的东西。您将一些典型的观察者方法子集(onNextonErroronCompleted)传递给 forEachFuture,Observable 将按通常方式调用这些方法。但 forEachFuture 本身返回一个 Future,它在 get 上阻塞,直到源 Observable 完成,然后返回完成或错误,具体取决于 Observable 的完成方式。

如果您需要一个阻塞直到 Observable 完成的函数,可以使用它。

rxjava-async 模块还包含 runAsync 操作符。它很奇怪,因为它创建了一个称为 StoppableObservable 的 Observable 特化版本。

runAsync 传递给一个 Action 和一个 Scheduler,它将返回一个 StoppableObservable,该 Observable 使用指定的 Action 生成它发射的项。Action 接受一个 Observer 和一个 Subscription。它使用 Subscription 检查 unsubscribed 条件,如果条件满足,它将停止发射项。您还可以通过调用其 unsubscribe 方法随时手动停止 StoppableObservable(这也会取消订阅您与 StoppableObservable 关联的 Subscription)。

由于 runAsync 立即调用 Action 并开始发射项(即,它产生一个 Observable),因此在您使用此操作符建立 StoppableObservable 和您的 Observer 准备好接收项之间的时间间隔内,可能会丢失一些项。如果这是一个问题,您可以使用 runAsync 的变体,它也接受一个 Subject,并将一个 ReplaySubject 传递给它,您可以使用它来检索否则会丢失的项。

在 RxGroovy 中,还有一个 From 操作符的版本,它将 Future 转换为 Observable,并因此类似于 Start 操作符。

各种 RxJava 实现的 Start 位于可选的 rxjava-async 模块中。

start

rxjava-async 模块包含 start 操作符,它接受一个函数作为参数,调用该函数来检索一个值,然后返回一个 Observable,它将该值发射到每个后续观察者。

注意,即使多个观察者订阅了生成的 Observable,该函数也只执行一次。

toAsync

rxjava-async 模块还包含 toAsyncasyncActionasyncFunc 操作符。这些操作符接受一个函数或一个 Action 作为参数。对于函数,这种变体操作符调用该函数来检索一个值,然后返回一个 Observable,它将该值发射到每个后续观察者(就像 start 操作符一样)。

对于 Action,过程类似,但没有返回值。在这种情况下,由该操作符创建的 Observable 将在终止之前发射一个 null

注意,即使多个观察者订阅了生成的 Observable,该函数或 Action 也只执行一次。

startFuture

rxjava-async 模块还包含 startFuture 操作符。您将一个返回 Future 的函数传递给它。startFuture 立即调用该函数以获取 Future,并调用 Futureget 方法以尝试获取其值。它返回一个 Observable,它将此值发射到任何后续观察者。

deferFuture

rxjava-async 模块还包含 deferFuture 操作符。您将一个返回一个返回 Observable 的 Future 的函数传递给它。deferFuture 返回一个 Observable,但在观察者订阅它返回的 Observable 之前不会调用您提供的函数。当它这样做时,它会立即调用结果 Future 上的 get,然后将 Future 返回的 Observable 的发射作为它自己的发射镜像。

这样,您就可以在 Observable 级联中包含一个返回 Observable 的 Future 作为其他 Observable 的对等方。

fromAction

rxjava-async 模块还包含 fromAction 操作符。它接受一个 Action 作为参数,并返回一个 Observable,该 Observable 在 Action 终止时发射您传递给 fromAction 的项。

fromCallable

rxjava-async 模块还包含 fromCallable 操作符。它接受一个 Callable 作为参数,并返回一个 Observable,该 Observable 将此可调用对象的返回值作为其唯一的发射。

fromRunnable

rxjava-async 模块还包含 fromRunnable 操作符。它接受一个 Runnable 作为参数,并返回一个 Observable,该 Observable 在 Runnable 终止时发射您传递给 fromRunnable 的项。

forEachFuture

rxjava-async 模块还包含 forEachFuture 操作符。它实际上不是 Start 操作符的变体,而是一个完全独立的东西。您将一些典型的观察者方法子集(onNextonErroronCompleted)传递给 forEachFuture,Observable 将按通常方式调用这些方法。但 forEachFuture 本身返回一个 Future,它在 get 上阻塞,直到源 Observable 完成,然后返回完成或错误,具体取决于 Observable 的完成方式。

如果您需要一个阻塞直到 Observable 完成的函数,可以使用它。

rxjava-async 模块还包含 runAsync 操作符。它很奇怪,因为它创建了一个称为 StoppableObservable 的 Observable 特化版本。

runAsync 传递给一个 Action 和一个 Scheduler,它将返回一个 StoppableObservable,该 Observable 使用指定的 Action 生成它发射的项。Action 接受一个 Observer 和一个 Subscription。它使用 Subscription 检查 unsubscribed 条件,如果条件满足,它将停止发射项。您还可以通过调用其 unsubscribe 方法随时手动停止 StoppableObservable(这也会取消订阅您与 StoppableObservable 关联的 Subscription)。

由于 runAsync 立即调用 Action 并开始发射项(即,它产生一个 Observable),因此在您使用此操作符建立 StoppableObservable 和您的 Observer 准备好接收项之间的时间间隔内,可能会丢失一些项。如果这是一个问题,您可以使用 runAsync 的变体,它也接受一个 Subject,并将一个 ReplaySubject 传递给它,您可以使用它来检索否则会丢失的项。

在 RxJava 中,还有一个 From 操作符的版本,它将 Future 转换为 Observable,并因此类似于 Start 操作符。

start

RxJS 实现 start 操作符。它将返回值将是结果 Observable 的发射的函数作为参数,以及可选地传递给该函数的任何其他参数和一个 Scheduler,在该 Scheduler 上运行该函数。

示例代码

var context = { value: 42 };

var source = Rx.Observable.start(
    function () {
        return this.value;
    },
    context,
    Rx.Scheduler.timeout
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 42
Completed

start 位于以下发行版中

  • rx.async.js(需要 rx.binding.js 以及 rx.jsrx.compat.js
  • rx.async.compat.js(需要 rx.binding.js 以及 rx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
start

RxJS 还实现 startAsync 操作符。它将返回值将是结果 Observable 的发射的异步函数作为参数。

您可以使用 toAsync 方法将函数转换为异步函数。它将函数、函数参数和 Scheduler 作为参数,并返回将在指定 Scheduler 上调用的异步函数。最后两个参数是可选的;如果您没有指定 Scheduler,则默认情况下将使用 timeout Scheduler。

示例代码

var source = Rx.Observable.startAsync(function () {
    return RSVP.Promise.resolve(42);
});

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 42
Completed

startAsync 位于以下发行版中

  • rx.async.js(需要 rx.binding.js 以及 rx.jsrx.compat.js
  • rx.async.compat.js(需要 rx.binding.js 以及 rx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

toAsync 位于以下发行版中

  • rx.async.js(需要 rx.binding.js 以及 rx.jsrx.compat.js
  • rx.async.compat.js(需要 rx.binding.js 以及 rx.jsrx.compat.js

RxPHP 将此操作符实现为 start

在指定调度器上异步调用指定的函数,通过可观察序列呈现结果。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/start/start.php

$source = Rx\Observable::start(function () {
    return 42;
});

$source->subscribe($stdoutObserver);

   
Next value: 42
Complete!
    

待定

待定