ReactiveX

ReactiveX 是一个用于通过使用可观察序列来组合异步和基于事件的程序的库。

它扩展了 观察者模式 以支持数据和/或事件序列,并添加了运算符,这些运算符允许您以声明式的方式将序列组合在一起,同时抽象出低级线程、同步、线程安全性、并发数据结构和非阻塞 I/O 等方面的关注。

可观察对象通过成为访问多个项目的异步序列的理想方式来填补空白
单个项目多个项目
同步的T getData()Iterable<T> getData()
异步的Future<T> getData()Observable<T> getData()

它有时被称为“函数式响应式编程”,但这是一种误称。ReactiveX 可能是函数式的,也可能是响应式的,但“函数式响应式编程”是另一种动物。一个主要的区别是,函数式响应式编程操作的是随时间连续变化的值,而 ReactiveX 操作的是随时间发射的离散值。(有关函数式响应式编程的更准确信息,请参见 Conal Elliott 的作品。)

为什么要使用可观察对象?

ReactiveX Observable 模型允许您用与用于数组等数据项集合的简单、可组合的操作相同的操作来处理异步事件流。它让您摆脱了错综复杂的回调网,从而使您的代码更具可读性,并且不易出错。

可观察对象是可组合的

诸如 Java Futures 之类的技术对于 单个级别的异步执行 来说很容易使用,但是当它们嵌套时,它们会开始增加 非平凡的复杂性

使用 Futures 来最佳地组合条件异步执行流 很困难 (或者是不可能的,因为每个请求的延迟在运行时都会有所不同)。当然,这 可以做到,但它很快就会变得复杂 (从而容易出错) 或者它会过早地在 Future.get() 上阻塞,这消除了异步执行的好处。

另一方面,ReactiveX Observables 的目的组合异步数据的流和序列

可观察对象是灵活的

ReactiveX Observables 不仅支持发射单个标量值 (如 Futures 所做的那样),还支持发射值序列甚至无限流。Observable 是一个可以在任何这些用例中使用的单一抽象。Observable 具有与其镜像对等物 Iterable 相关联的所有灵活性和优雅性。

</table>

可观察对象更少意见

ReactiveX 不偏向于某种特定的并发或异步源。可观察对象可以使用线程池、事件循环、非阻塞 I/O、actor (如 Akka 中的 actor) 或者任何适合您需要、风格或专业知识的实现来实现。客户端代码将其与 Observables 的所有交互都视为异步的,无论您的底层实现是阻塞还是非阻塞,以及您选择如何实现它。

Observable 是异步/推送 “对偶” 到同步/拉取 Iterable
事件Iterable (拉取)Observable (推送)
检索数据T next()onNext(T)
发现错误抛出 ExceptiononError(Exception)
完成!hasNext()onCompleted()
这个 Observable 是如何实现的?
public Observable<data> getData();
从 Observer 的角度来看,这无关紧要!
  • 它是在与调用者相同的线程上同步工作吗?
  • 它是在不同的线程上异步工作吗?
  • 它是在可能以任何顺序将数据返回给调用者的多个线程上划分其工作吗?
  • 它是否使用 Actor (或多个 Actor) 而不是线程池?
  • 它是否使用 NIO 与事件循环进行异步网络访问?
  • 它是否使用事件循环来分离工作线程和回调线程?

重要的是:使用 ReactiveX,您以后可以改变主意,并且可以从根本上改变 Observable 实现的底层性质,而不会破坏 Observable 的使用者。

回调也有自己的问题

回调通过不允许任何东西阻塞来解决过早地在 Future.get() 上阻塞的问题。它们天生高效,因为它们在响应准备好时执行。

但与 Futures 一样,虽然回调在使用单个级别的异步执行时很容易使用,但 在嵌套组合中,它们变得笨拙

ReactiveX 是一个多语言实现

ReactiveX 目前已在多种语言中实现,以尊重这些语言的习惯用法,并且更多语言正在以飞快的速度被添加。

响应式编程

ReactiveX 提供了 一组运算符,您可以使用这些运算符来过滤、选择、转换、组合和组合 Observables。这允许高效地执行和组合。

您可以将 Observable 类视为 Iterable (它是“拉取”) 的“推送”等价物。对于 Iterable,使用者从生产者那里拉取值,并且线程会阻塞直到这些值到达。相反,对于 Observable,生产者会在值可用时将值推送到使用者。这种方法更灵活,因为值可以同步或异步到达。

显示如何将类似的高阶函数应用于 Iterable 和 Observable 的示例代码
IterableObservable
getDataFromLocalMemory()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .forEach({ println "next => " + it })
getDataFromNetwork()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .subscribe({ println "onNext => " + it })

Observable 类型在 四人帮的观察者模式 中添加了两个缺失的语义,以匹配在 Iterable 类型中可用的语义

  1. 生产者向使用者发出信号,表明不再有数据可用 (在这种情况下,Iterable 上的 foreach 循环完成并正常返回;Observable 调用其观察者的 onCompleted 方法)
  2. 生产者向使用者发出信号,表明发生了错误 (如果迭代过程中发生错误,Iterable 会抛出异常;Observable 调用其观察者的 onError 方法)

通过这些添加,ReactiveX 使 Iterable 和 Observable 类型协调一致。它们之间唯一的区别是数据流的方向。这非常重要,因为现在您可以对 Iterable 执行的任何操作,也可以对 Observable 执行。