From

将各种其他对象和数据类型转换为 Observables

From

当您使用 Observables 时,如果所有要处理的数据都可以表示为 Observables,而不是 Observables 和其他类型的混合,会更方便。这样您就可以使用一组操作符来控制数据流的整个生命周期。

例如,可迭代对象可以被视为一种同步的 Observable;Futures 是一种始终只发射单个项目的 Observable。通过将这些对象显式转换为 Observables,您可以允许它们与其他 Observables 作为对等体进行交互。

出于这个原因,大多数 ReactiveX 实现都有方法允许您将某些特定于语言的对象和数据结构转换为 Observables。

另请参见

特定于语言的信息

待定

待定

from

在 RxGroovy 中,from 操作符可以转换 Future、Iterable 或 Array。对于 Iterable 或 Array,生成的 Observable 将发射 Iterable 或 Array 中包含的每个项目。

对于 Future,它将发射 get 调用的单个结果。您可以选择传递接受 future 的 from 版本,该版本另外两个参数表示超时跨度和跨度的计量单位。如果在 Future 响应值之前经过该时间跨度,生成的 Observable 将以错误终止。

from 默认情况下不会在任何特定的 Scheduler 上运行,但是您可以传递将 Future 转换为 Scheduler 的变体,它将作为可选的第二个参数,它将使用该 Scheduler 来控制 Future。

fromFunc0

此外,在 RxJavaAsyncUtil 包中,您可以使用以下操作符将操作、可调用对象、函数和可运行对象转换为发射这些内容结果的 Observables

  • fromAction
  • fromCallable
  • fromFunc0
  • fromRunnable

有关这些操作符的更多信息,请参见 Start 操作符。

from

请注意,还有一个 from 操作符是可选 StringObservable 类的成员方法。它将字符流或 Reader 转换为发射字节数组或字符串的 Observable。

在单独的 RxJavaAsyncUtil 包中(默认情况下不包含在 RxGroovy 中),还有一个 runAsync 函数。将 runAsync 传递给 Action 和一个 Scheduler,它将返回一个 StoppableObservable,该 StoppableObservable 使用指定的 Action 来生成它发射的项目。

Action 接受一个 Observer 和一个 Subscription。它使用 Subscription 来检查 isUnsubscribed 条件,如果该条件成立,它将停止发射项目。您也可以随时通过调用其 unsubscribe 方法来手动停止 StoppableObservable(这也会取消您与 StoppableObservable 关联的 Subscription 的订阅)。

因为 runAsync 会立即调用 Action 并开始发射项目,所以有可能在您使用此方法建立 StoppableObservable 和您的 Observer 准备接收项目之间的时间间隔内会丢失一些项目。如果这是个问题,您可以使用 runAsync 的变体,该变体还接受一个 Subject,并将 ReplaySubject 传递给它,您可以使用该 ReplaySubject 来检索其他情况下丢失的项目。

decode

StringObservable 类(不是 RxGroovy 的默认部分)还包括 decode 操作符,该操作符将多字节字符流转换为发射尊重字符边界的字节数组的 Observable。

from

在 RxJava 中,from 操作符可以转换 Future、Iterable 或 Array。对于 Iterable 或 Array,生成的 Observable 将发射 Iterable 或 Array 中包含的每个项目。

示例代码

Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);

myObservable.subscribe(
    new Action1<Integer>() {
        @Override
        public void call(Integer item) {
            System.out.println(item);
        }
    },
    new Action1<Throwable>() {
        @Override
        public void call(Throwable error) {
            System.out.println("Error encountered: " + error.getMessage());
        }
    },
    new Action0() {
        @Override
        public void call() {
            System.out.println("Sequence complete");
        }
    }
);
0
1
2
3
4
5
Sequence complete

对于 Future,它将发射 get 调用的单个结果。您可以选择传递接受 future 的 from 版本,该版本另外两个参数表示超时跨度和跨度的计量单位。如果在 Future 响应值之前经过该时间跨度,生成的 Observable 将以错误终止。

from 默认情况下不会在任何特定的 Scheduler 上运行,但是您可以传递将 Future 转换为 Scheduler 的变体,它将作为可选的第二个参数,它将使用该 Scheduler 来控制 Future。

fromFunc0

此外,在 RxJavaAsyncUtil 包中,您可以使用以下操作符将操作、可调用对象、函数和可运行对象转换为发射这些内容结果的 Observables

  • fromAction
  • fromCallable
  • fromFunc0
  • fromRunnable

有关这些操作符的更多信息,请参见 Start 操作符。

from

请注意,还有一个 from 操作符是可选 StringObservable 类的成员方法。它将字符流或 Reader 转换为发射字节数组或字符串的 Observable。

在单独的 RxJavaAsyncUtil 包中(默认情况下不包含在 RxJava 中),还有一个 runAsync 函数。将 runAsync 传递给 Action 和一个 Scheduler,它将返回一个 StoppableObservable,该 StoppableObservable 使用指定的 Action 来生成它发射的项目。

Action 接受一个 Observer 和一个 Subscription。它使用 Subscription 来检查 isUnsubscribed 条件,如果该条件成立,它将停止发射项目。您也可以随时通过调用其 unsubscribe 方法来手动停止 StoppableObservable(这也会取消您与 StoppableObservable 关联的 Subscription 的订阅)。

因为 runAsync 会立即调用 Action 并开始发射项目,所以有可能在您使用此方法建立 StoppableObservable 和您的 Observer 准备接收项目之间的时间间隔内会丢失一些项目。如果这是个问题,您可以使用 runAsync 的变体,该变体还接受一个 Subject,并将 ReplaySubject 传递给它,您可以使用该 ReplaySubject 来检索其他情况下丢失的项目。

decode

StringObservable 类(不是 RxGroovy 的默认部分)还包括 decode 操作符,该操作符将多字节字符流转换为发射尊重字符边界的字节数组的 Observable。

RxJS 中有几个专门的 From 变体

from

在 RxJS 中,from 操作符将类数组或可迭代对象转换为发射该数组或可迭代对象中的项目的 Observable。在这种情况下,字符串被视为字符数组。

此操作符还接受三个额外的可选参数

  1. 一个转换函数,它将来自数组或可迭代对象的项目作为输入,并生成一个要由生成的 Observable 发射的项目作为输出
  2. 作为附加上下文信息传递给转换函数的第二个参数
  3. 一个 Scheduler,该操作符应该在其上运行

示例代码

// Array-like object (arguments) to Observable
function f() {
  return Rx.Observable.from(arguments);
}

f(1, 2, 3).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 3
Completed
// Any iterable object...
// Set
var s = new Set(['foo', window]);
Rx.Observable.from(s).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: foo
Next: window
Completed
// Map
var m = new Map([[1, 2], [2, 4], [4, 8]]);
Rx.Observable.from(m).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: [1, 2]
Next: [2, 4]
Next: [4, 8]
Completed
// String
Rx.Observable.from("foo").subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: f
Next: o
Next: o
Completed
// Using an arrow function as the map function to manipulate the elements
Rx.Observable.from([1, 2, 3], function (x) { return x + x; }).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: 2
Next: 4
Next: 6
Completed
// Generate a sequence of numbers
Rx.Observable.from({length: 5}, function(v, k) { return k; }).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed

from 存在于以下发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
fromCallback

fromCallback 操作符接受一个函数作为参数,调用此函数,并将从该函数返回的值作为其单个发射发射。

此操作符还接受两个额外的可选参数

  1. 要传递给回调函数的参数
  2. 一个转换函数,它将回调函数的返回值作为输入,并返回一个要由生成的 Observable 发射的项目

示例代码

var fs = require('fs'),
    Rx = require('rx');

// Wrap fs.exists
var exists = Rx.Observable.fromCallback(fs.exists);

// Check if file.txt exists
var source = exists('file.txt');

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: true
Completed

fromCallback 存在于以下发行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.async.js(需要 rx.binding.jsrx.jsrx.compat.js
  • rx.async.compat.js(需要 rx.binding.jsrx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

还有一个 fromNodeCallback 操作符,它专门用于 Node.js 中的回调函数类型。

此操作符接受三个额外的可选参数

  1. 一个 Scheduler,您想要在其上运行 Node.js 回调
  2. 要传递给回调函数的参数
  3. 一个转换函数,它将回调函数的返回值作为输入,并返回一个要由生成的 Observable 发射的项目

示例代码

var fs = require('fs'),
    Rx = require('rx');

// Wrap fs.exists
var rename = Rx.Observable.fromNodeCallback(fs.rename);

// Rename file which returns no parameters except an error
var source = rename('file1.txt', 'file2.txt');

var subscription = source.subscribe(
    function () { console.log('Next: success!'); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: success!
Completed

fromNodeCallback 存在于以下发行版中

  • rx.async.js(需要 rx.binding.jsrx.jsrx.compat.js
  • rx.async.compat.js(需要 rx.binding.jsrx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
fromEvent

fromEvent 操作符接受一个“元素”和一个事件名称作为参数,然后它监听在该元素上发生的该名称的事件。它返回一个发射这些事件的 Observable。“元素”可以是简单的 DOM 元素,也可以是 NodeList、jQuery 元素、Zepto 元素、Angular 元素、Ember.js 元素或 EventEmitter。

此操作符还接受一个可选的第三个参数:一个函数,该函数接受事件处理程序中的参数作为参数,并返回一个要由生成的 Observable 发射的项目来代替事件。

示例代码

// using a jQuery element
var input = $('#input');

var source = Rx.Observable.fromEvent(input, 'click');

var subscription = source.subscribe(
    function (x) { console.log('Next: Clicked!'); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

input.trigger('click');
Next: Clicked!
// using a Node.js EventEmitter and the optional third parameter
var EventEmitter = require('events').EventEmitter,
    Rx = require('rx');

var eventEmitter = new EventEmitter();

var source = Rx.Observable.fromEvent(
    eventEmitter,
    'data',
    function (first, second) {
        return { foo: first, bar: second };
    });

var subscription = source.subscribe(
    function (x) {
        console.log('Next: foo -' + x.foo + ', bar -' + x.bar);
    },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

eventEmitter.emit('data', 'baz', 'quux');
Next: foo - baz, bar - quux

fromEvent 存在于以下发行版中

  • rx.async.js(需要 rx.binding.jsrx.jsrx.compat.js
  • rx.async.compat.js(需要 rx.binding.jsrx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

fromEventPattern 操作符类似,不同的是它没有接受元素和事件名称作为参数,而是接受两个函数作为参数。第一个函数将事件监听器附加到各种元素上的各种事件;第二个函数删除这组监听器。这样,您就可以建立一个发射代表各种事件和各种目标元素的项目的单个 Observable。

示例代码

var input = $('#input');

var source = Rx.Observable.fromEventPattern(
    function add (h) {
        input.bind('click', h);
    },
    function remove (h) {
        input.unbind('click', h);
    }
);

var subscription = source.subscribe(
    function (x) { console.log('Next: Clicked!'); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

input.trigger('click');
Next: Clicked!
of

of 操作符接受多个项目作为参数,并返回一个发射这些参数的 Observable,按顺序作为其发射的序列。

示例代码

var source = Rx.Observable.of(1,2,3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 3
Completed

of 存在于以下发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

此操作符的一个变体称为 ofWithScheduler,它将 Scheduler 作为其第一个参数,并在该 Scheduler 上运行生成的 Observable。

还有一个 fromPromise 操作符,它将 Promise 转换为 Observable,将它的 resolve 调用转换为 onNext 通知,并将它的 reject 调用转换为 onError 通知。

fromPromise 存在于以下发行版中

  • rx.async.js(需要 rx.binding.jsrx.jsrx.compat.js
  • rx.async.compat.js(需要 rx.binding.jsrx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

示例代码

var promise = new RSVP.Promise(function (resolve, reject) {
   resolve(42);
});

var source = Rx.Observable.fromPromise(promise);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });
Next: 42:
Completed
var promise = new RSVP.Promise(function (resolve, reject) {
   reject(new Error('reason'));
});

var source = Rx.Observable.fromPromise(promise);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });
Error: Error: reject

还有一个 ofArrayChanges 操作符,它使用 Array.observe 方法监控数组,并返回一个发射数组中发生的任何更改的 Observable。此操作符仅存在于 rx.all.js 发行版中。

示例代码

var arr = [1,2,3];
var source = Rx.Observable.ofArrayChanges(arr);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });

arr.push(4)
Next: {type: "splice", object: Array[4], index: 3, removed: Array[0], addedCount: 1}

类似的操作符是 ofObjectChanges。它返回一个发射对特定对象进行的任何更改的 Observable,如 Object.observe 方法报告的那样。它也仅存在于 rx.all.js 发行版中。

示例代码

var obj = {x: 1};
var source = Rx.Observable.ofObjectChanges(obj);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });

obj.x = 42;
Next: {type: "update", object: Object, name: "x", oldValue: 1}

还有一个 pairs 操作符。此操作符接受一个对象,并返回一个发射该对象的属性作为键值对的 Observable。

示例代码

var obj = {
  foo: 42,
  bar: 56,
  baz: 78
};

var source = Rx.Observable.pairs(obj);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });
Next: ['foo', 42]
Next: ['bar', 56]
Next: ['baz', 78]
Completed

pairs 存在于以下发行版中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP 将此操作符实现为 fromArray

将数组转换为可观察序列

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/fromArray/fromArray.php

$source = \Rx\Observable::fromArray([1, 2, 3, 4]);

$subscription = $source->subscribe($stdoutObserver);

//Next value: 1
//Next value: 2
//Next value: 3
//Next value: 4
//Complete!

   
Next value: 1
Next value: 2
Next value: 3
Next value: 4
Complete!
    

RxPHP 还有一个 fromIterator 操作符。

将迭代器转换为可观察序列

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/iterator/iterator.php

$generator = function () {
    for ($i = 1; $i <= 3; $i++) {
        yield $i;
    }

    return 4;
};

$source = Rx\Observable::fromIterator($generator());

$source->subscribe($stdoutObserver);

   
Next value: 1
Next value: 2
Next value: 3
Next value: 4
Complete!
    

RxPHP 还有一个 asObservable 操作符。

隐藏可观察序列的身份。

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/asObservable/asObservable.php

// Create subject
$subject = new \Rx\Subject\AsyncSubject();

// Send a value
$subject->onNext(42);
$subject->onCompleted();

// Hide its type
$source = $subject->asObservable();

$source->subscribe($stdoutObserver);
   
Next value: 42
Complete!
    

RxPHP 还有一个 fromPromise 操作符。

将 Promise 转换为 Observable

示例代码

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/promise/fromPromise.php

$promise = \React\Promise\resolve(42);

$source = \Rx\Observable::fromPromise($promise);

$subscription = $source->subscribe($stdoutObserver);

   
Next value: 42
Complete!
    

在 Swift 中,这是使用 Observable.from 类方法实现的。

数组中的每个元素都作为发射产生。此方法与 Observable.just 之间的区别在于后者将整个数组作为一个发射发射。

示例代码

let numbers = [1,2,3,4,5]

let source = Observable.from(numbers)

source.subscribe {
    print($0)
}
next(1)
next(2)
next(3)
next(4)
next(5)
completed