RxJava反应式编程(txt+pdf+epub+mobi电子书下载)


发布时间:2020-09-25 17:23:55

点击下载

作者:(波兰)托马什·努尔凯维茨(Tomasz Nurkiewicz)(美)本·克里斯滕森(Ben Christensen)

出版社:人民邮电出版社有限公司

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

RxJava反应式编程

RxJava反应式编程试读:

前言

本书目标读者

本书适合中级和高级 Java 程序员。你应该非常熟悉 Java,但是并不需要预先掌握反应式编程的知识。本书中的很多概念都与函数式编程相关,不过你也无须预先了解这方面的知识。以下两类程序员都能从本书中获益。● 想提升服务器性能或者想让移动设备的代码更具可维护性的软件

工程师。如果你属于这一类的话,将会在本书中找到解决实际问

题的理念和方案,还有切实中肯的建议。在这种情况下,

RxJava 只是本书帮助你掌握的另一个工具。● 好奇的开发人员。他们可能听说过反应式编程,尤其是 RxJava,

并且想要真正地理解它。如果你属于这种情况,即使没计划在生

产环境的代码中使用 RxJava,本书也会开拓你的视野。

另外,如果你是一名真正的软件架构师,本书很可能会对你有所帮助。RxJava 会影响整个系统的总体架构,所以值得去了解。即便你刚刚体验编程,也可以尝试翻阅本书的前几章,这几章介绍了基础知识。一些底层的理念是通用的,如转换和组合,它们并不是反应式编程特有的。本 • 克里斯滕森的说明

2012 年,我正在为 Netflix API 实现一种新的架构。在这个过程中逐渐明确的是,为了实现目标,我们需要拥抱并发和异步网络请求。在探索实现方式的过程中,我遇到了 Jafar Husain,他努力向我“兜售”他在微软学到的名为“Rx”的方法。我当时虽然非常熟悉并发,但是依然按照命令式的方式进行思考,并且极度以 Java 为中心,因为 Java 一直是我的经济支柱,我在它上面花费了大量时间。

Jafar 向我“兜售”Rx 方法时,因其函数式编程风格,我很难理解这些理念,只能先搁置。

在数月的讨论之后,整体的系统架构不断成熟。在这个过程中,我和 Jafar 继续进行了多次白板会议,直到我掌握了这些理论原则,并且意识到 Reactive Extensions 的优雅和力量。

我们决定在 Netflix API 中采用 Rx 编程模型,最终创建了 Reactive Extensions 的 Java 实现,即 RxJava,它遵循了微软从 Rx.Net 和 RxJS 开始的命名约定。

在我从事 RxJava 开发工作的大约三年间,大部分工作都是在 GitHub 上以开放的方式完成的。我有幸与一个不断成长的社区一起工作,120 多位贡献者一起将 RxJava 变成了一个成熟的产品,它被用于很多生产系统中的服务器端和客户端。在 GitHub 上它获得了 15 000 多颗星,成为了排名前 200 名的项目之一,同时在使用 Java 的项目中排名第三。

Netflix 的 George Campbell、Aaron Tull 和 Matt Jacobs 在 RxJava 成熟的过程中起着关键作用,他们将 RxJava 从早期的构建版本变成了现在的样子,所做的工作包括添加了 lift、Subscriber、回压以及 JVM 多语言支持。Dávid Karnok 随后参与了进来,他提交代码的次数和行数都已经超过了我。他是项目取得成功的关键因素之一,并且已成为项目的领导者。

我必须要感谢 Erik Meijer,他在微软期间创建了 Rx。在他离开微软之后,我曾在 Netflix 就 RxJava 项目与他合作过。现在,我有幸和他一起在 Facebook 工作。和他在白板前花那么多的时间一起讨论是我的荣幸。Erik 这样的良师益友大大提高了我的思想水平。

在这个过程中,我还在很多会议上阐述了 RxJava 和反应式编程的相关内容,并结识了很多人,他们帮助我学习了关于代码和架构的更多知识。如果是自学的话,我不可能学到这么多。

Netflix 支持我在这个项目上花费时间和精力,并且在技术文档方面提供了支持。我自己可能永远也写不出这样的文档。如果没有“工作时间”的付出和拥有不同技能的许多人员的参与,这种成熟度和规模的开源项目是无法取得成功的。

在第 1 章中,我会试图讲解为何反应式编程是一种有用的编程方式,以及 RxJava 是如何具体实现这些原则的。

本书其余部分由托马什编写,他写得非常棒。我有机会审阅并提了一些建议,但这是他的书,从第 2 章开始他将负责详细内容的讲解。托马什 • 努尔凯维茨的说明

2013 年,我在一家金融机构工作的时候,第一次接触 RxJava。当时,我们要实时处理大量市场数据。那时数据管道的组成是这样的:Kafka 传递消息,Akka 处理交易,Clojure 转换数据,还有一个自定义构建的语言在整个系统中传播变更。RxJava 是一个非常具有吸引力的选择,因为它有一个统一的 API,能够很好地处理不同来源的数据。

随着时间的推移,我尝试在更多的场景中使用反应式编程。在这些场景中,可扩展性和吞吐量都至关重要。按照反应式的方式来实现系统肯定要求更高,但是它带来的好处更为重要,包括更高的硬件利用率以及由此带来的能源节省。为了充分理解这种编程模型的优势,开发人员必须拥有相对易用的工具。我们认为 Reactive Extensions 在抽象、复杂性以及性能之间实现了平衡。

除非特别说明,本书讲述的是 RxJava 1.1.6 版本。尽管 RxJava 支持 Java 6 及更高版本,但是几乎所有的示例都用到了 Java 8 中的 lambda 语法。在讨论 Android 的第 8 章中,有些示例展现了如何在不支持 lambda 表达式的环境中处理烦琐的语法。话虽如此,本书并不会一直采用最简短的语法(比如方法引用),这主要是为了在适当的场景下提升代码可读性。本书结构

如果你从头读到尾的话,将会收获最大。如果你没有那么多的时间,也可以选择最感兴趣的部分。如果某个概念在本书前面的章节中介绍过了,那么你很可能会找到关于参考章节的说明。如下是各章的简要介绍。

第 1 章简要介绍 RxJava 的起源、基本概念以及理念(本)。

第 2 章讨论如何将 RxJava 用于自己的应用程序,以及如何与它交互。这一章非常基础,但是理解一些概念是特别重要的,比如热源和冷源(托马什)。

第 3 章快速介绍 RxJava 提供的很多操作符,讲解一些具有表现力且强大的函数,它们是这个库的基础(托马什)。

第 4 章更加实用,展现如何将 RxJava 嵌入代码库的不同地方,还会简单介绍并发性(托马什)。

第 5 章内容更高级,阐述如何从头到尾实现反应式应用程序(托马什)。

第 6 章解释流控制中一个非常重要的问题,并介绍 RxJava 中的回压机制是如何解决该问题的(托马什)。

第 7 章介绍基于 Rx 的应用程序的单元测试、维护以及问题排查等相关技术(托马什)。

第 8 章展现一些 RxJava 应用程序,尤其是分布式系统中的程序(托马什)。

第 9 章重点介绍 RxJava 2.x 未来的计划(本)。在线资源

本书所有的弹珠图均来源于 RxJava 的官方文档,它们基于 Apache 许可证 2.0 版本发布。排版约定

本书使用了下列排版约定。

黑体字  表示新术语或重点强调的内容。

等宽字体(constant width)  表示程序片段,以及正文中出现的变量、函数名、数据库、数据类型、环境变量、语句和关键字等。

加粗等宽字体(constant width bold)  表示应该由用户输入的命令或其他文本。

等宽斜体(constant width italic)  表示应该由用户输入的值或根据上下文确定的值替换的文本。 该图标表示提示或建议。 该图标表示一般注记。 该图标表示警告或警示Safari® Books Online

Safari Books Online(http://www.safaribooksonline.com)是应运而生的数字图书馆。它同时以图书和视频的形式出版世界顶级技术和商务作家的专业作品。技术专家、软件开发人员、Web 设计师、商务人士和创意专家等,在开展调研、解决问题、学习和认证培训时,都将 Safari Books Online 视作获取资料的首选渠道。

对于组织团体、政府机构和个人,Safari Books Online 提供各种产品组合和灵活的定价策略。用户可通过一个功能完备的数据库检索系统访问 O'Reilly Media、Prentice Hall Professional、Addison-Wesley Professional、Microsoft Press、Sams、Que、Peachpit Press、Focal Press、Cisco Press、John Wiley & Sons、Syngress、Morgan Kaufmann、IBM Redbooks、Packt、Adobe Press、FT Press、Apress、Manning、New Riders、McGraw-Hill、Jones & Bartlett、Course Technology 以及其他几十家出版社的上千种图书、培训视频和正式出版之前的书稿。要了解 Safari Books Online 的更多信息,我们网上见。联系我们

请把对本书的评价和问题发给出版社。

美国:  O'Reilly Media, Inc.  1005 Gravenstein Highway North  Sebastopol, CA 95472

中国:  北京市西城区西直门南大街 2 号成铭大厦 C 座 807 室(100035)  奥莱利技术咨询(北京)有限公司

O'Reilly 的每一本书都有专属网页,你可以在那儿找到本书的相关信息,包括勘误表、示例代码以及其他信息。本书的网站地址是 http://bit.ly/reactive-prog-with-rxjava。

对于本书的评论和技术性问题,请发送电子邮件到:bookquestions@oreilly.com

要了解更多 O'Reilly 图书、培训课程、会议和新闻的信息,请访问以下网站:  http://www.oreilly.com

我们在 Facebook 的地址如下:http://facebook.com/oreilly

请关注我们的 Twitter 动态:http://twitter.com/oreillymedia

我们的 YouTube 视频地址如下:http://www.youtube.com/oreillymedia致谢来自本的致谢

如果没有托马什的话,这本书将不会存在,他编写了这本书大部分的内容。当然,还有我们的编辑 Nan Barber,他为我们提供了很多帮助并且非常有耐心,陪我们坚持到了最后。感谢托马什在 Twitter 上答复我寻找作者的消息,最终将这本书变成现实。

我还要感谢 Netflix 开源组织和 Daniel Jacobson 多年来对我本人和项目的支持。他们是这个项目的赞助者,并让我将大量的时间用在了这个社区上。非常感谢!

感谢 Erik 创建了 Rx、教会我很多东西,并为本书作序。来自托马什的致谢

首先,我要感谢我的父母,大约 20 年前他们给了我第一台计算机(带有 8 MB 内存的 486DX2,我永远也不会忘记)。我的编程之旅就是这样开始的。有很多人为这本书的撰写做出了贡献。首先是本,他同意编写本书的第 1 章和最后一章,并审阅了我编写的内容。

提到审阅者,Venkat Subramaniam 花费了很多精力,让我能够用一种恰当且一致的方式来组织本书内容。他经常建议我使用不同的句子、段落和章节顺序组织内容,甚至建议删除整页不相关的内容。我们另外一位审阅者是极有见识和经验的 Dávid Karnok。作为 RxJava 的项目负责人,他发现了许多 bug、竞态条件、不一致性和其他问题。这两位审阅者都提供了数百条评论,极大地提高了这本书的质量。在这本书的最初阶段,我的许多同事阅读了手稿,并给出了非常有价值的反馈。我要感谢 Dariusz Baciński、Szymon Homa、Piotr Pietrzak、Jakub Pilimon、Adam Wojszczyk、Marcin Zajączkowski 和 Maciej Ziarko。电子书

扫描如下二维码,即可购买本书中文电子版。第 1 章 使用 RxJava 实现反应式编程本 • 克里斯滕森(Ben Christensen)

RxJava 是对 Java 和 Android 进行反应式编程的具体实现,它受到了函数式编程的影响。RxJava 倡导函数组合,避免出现全局状态和副作用,并且要以流的方式思考,进而组合异步和基于事件的程序。它起源于观察者模式(observer pattern)的生产者 / 消费者回调,并且扩展了几十个操作符来实现组合、转换、调度、节流、错误处理以及生命周期管理。

RxJava 是一个成熟的开源库,已经被服务器端和 Android 移动设备广泛采用。除了这个库之外,开发人员还围绕 RxJava 和反应式编程构建了一个活跃的社区,主要用来改进项目、互相交流、撰写文章以及提供帮助。

这一章将概述 RxJava,讨论什么是 RxJava,以及它如何运行。本书的其余部分会带你了解 RxJava 的全部细节,以及如何将其用于应用程序。你在阅读本书的时候,可以没有任何反应式编程的经验,因为本书会从头开始,带领你了解 RxJava 的理念和实践,以便将它的优势应用到具体用例中。1.1 反应式编程与RxJava反应式编程(reactive programming)是一个通用的编程术语,它主要关注对变更做出反应,比如数据值或事件。反应式编程通常可以按照命令式(imperative)的方式来实现。回调就是一种以命令式实现反应式编程的方法。电子表格是反应式编程的一个绝佳例子:某些单元格依赖于其他的单元格,如果被依赖的单元格发生变化,这些单元格也会随之“做出反应”。函数式反应编程?尽管 Reactive Extensions(通常指Rx,特指的话是

RxJava)受到了函数式编程的影响,但它并不是函数式反

应编程(Functional Reactive Programming,FRP)。FRP

是非常具体的一种反应式编程类型,它涉及连续的时间,

而 RxJava 只处理随时间推移出现的离散事件。在Rx Java

的早期,我本人也陷入了这样的命名陷阱,将其宣传为“函

数式反应”,后来我发现“函数式反应”在数年前就已经被

别人定义了。因此除了“反应式编程”之外,没有一个公认

的通用术语来描述RxJava。FRP 还是经常被误用于描述

RxJava 和类似的方案。对于是应该拓展 FRP 的含义(因为

在过去的几年间,它已经被非正式地使用了),还是让

FRP 止于关注连续时间的实现,互联网上也时有争论。为了消除疑虑,可以把重点放在 RxJava 确实受到了函

数式编程的影响,并且有意地采取了与命令式编程不同的编

程模型。这一章提到“反应式”时,指的是 RxJava 使用的

反应式 + 函数式风格。与之相对,提到“命令式”时,并

不是说反应式编程不能以命令式的方式实现,强调的是使用

命令式的方式来编程,而不是 RxJava 的函数式风格。专门

对比命令式方式和函数式方式时,为了准确起见,会使用“反应式 - 函数式”和“反应式 - 命令式”。

在现在的计算机中,当涉及操作系统和硬件时,一切都会变成命令式的。开发人员必须明确告诉计算机要完成什么以及如何实现。人类不会像 CPU 和相关系统那样思考,所以我们添加了抽象。反应式 - 函数式编程就是一种抽象,就像高层级命令式编程术语是对底层二进制和汇编指令的抽象一样。记住并理解“一切都会变成命令式的”很重要,因为它能够帮助理解反应式 - 函数式编程的思维模型,并理解它最终是如何执行的,这里并没有什么魔法。

因此,作为一种编程方式,反应式 - 函数式编程是命令式系统之上的一种抽象。它允许开发人员在编写异步和事件驱动的用例时不用像计算机本身那样思考,也不用以命令式的方式来定义复杂的状态交互,尤其是跨线程和网络边界时。在处理异步和事件驱动的系统时,不用像计算机那样思考是一项有用的特质,因为这种情况会涉及并发和并行,而要正确和高效地使用这些功能是非常具有挑战性的。Brian Goetz 的著作《Java 并发编程实战》、Doug Lea 的著作 Concurrent Programming in Java 以及像 Mechanical Sympathy 这样的论坛,都表明了掌握并发所面临的深度、广度以及复杂性。使用 RxJava 以来,通过与这些书的作者、论坛和社区的专家的交流,我更加确信编写高性能、高效、可扩展和正确处理并发的软件相当不容易。这还没有将分布式系统考虑进来呢,它将并发性和并行性的难度提高了一截。

所以,简而言之,反应式 - 函数式编程解决的问题就是并发和并行。更通俗地说,它解决了回调地狱问题。回调地狱是以命令式的方式来处理反应式和异步用例带来的问题。反应式编程,比如 RxJava 实现,受到了函数式编程的影响,并且会使用声明式的方式来避免反应式 - 命令式代码常见的问题。1.2 何时需要反应式编程

反应式编程在如下场景中非常有用。● 处理用户事件,比如鼠标移动和单击、键盘输入、GPS 信号因

用户设备的移动而不断变化、设备陀螺仪信号和触摸事件等。● 响应和处理来自磁盘或网络的所有延迟受限的 IO 事件,IO 本质

上是异步的(发起请求,时间推移,可能收到也可能收不到响应,

触发下一步事件)。● 在应用程序中处理由该应用程序无法控制的生产者推送过来的事

件或数据(来自服务器的系统事件、上述用户事件、来自硬件的

信号、模拟世界中由传感器触发的事件等)。

如果涉及的代码只处理一个事件流,那么使用带有回调的反应式 - 命令式编程就很好,引入反应式 - 函数式编程并不会带来太多的收益。如果你有数百个不同的事件流,而且它们彼此独立,命令式编程也不会有太大的问题。在这种直接的使用场景中,命令式是最高效的方式,因为它消除了反应式编程的抽象层,并且更加契合对当前操作系统、语言和编译器的优化。

如果你的程序像大多数程序一样,那么你需要组合事件(或者函数或网络调用的异步响应)、包含事件交互的条件逻辑,而且在所有调用之后必须处理故障场景和清理资源。在这种情况下,反应式 - 命令式的复杂性会急剧增加,而反应式 - 函数式编程则能体现出它的价值了。我认同一个未经科学验证的观点,那就是反应式 - 函数式编程难入门而且学习曲线较陡峭,但是它的复杂性要远远低于反应式 - 命令式编程。

这就是称 Reactive Extensions(Rx)和 RxJava 是“用于组合异步和基于事件的程序的库”的原因。RxJava 是反应式编程原则的具体实现,受到了函数式以及数据流编程的影响。其实,我们有不同的方式来实现“反应式”,RxJava 只是其中之一。接下来深入研究一下它是如何运行的。1.3 RxJava是如何运行的

RxJava 的核心是 Observable 类型,它代表了数据或事件的流。它的目的是实现推送(反应式),但是也可以用于拉取(交互式)。它是延迟执行的(lazy),不是立即执行的(eager)。它可以同步使用,也可以异步使用。它能够代表随着时间推移产生的 0 个、1 个、多个或者无穷个值或事件。

这涉及很多的术语和细节,需要一一介绍,2.1 节将介绍完整的细节。1.3.1 推送与拉取

RxJava 实现反应式的要点在于它支持推送,所以 Observable 和关联的 Observer 类型签名支持把事件推送给它。这通常会伴随着异步,1.3.2 节会进行讨论。但是,Observable 类型还支持一个异步的反馈通道(有时也称为异步 - 拉取或反应式拉取),作为异步系统中的一种流控制或回压方式。本章后面会讨论流控制,以及如何使用该机制。

为了支持接收推送来的事件,Observable/Observer 通过订阅进行连接。Observable 代表了数据流,它可以被 Observer 订阅(2.2 节会介绍更多内容)。interface Observable { Subscription subscribe(Observer s)}

订阅之后,Observer 就能够接收三种推送给它的事件。● 通过 onNext() 函数推送的数据。● 通过 onError() 函数推送的错误(异常或 Throwable)。● 通过 onCompleted() 函数推送的流完成信息。interface Observer { void onNext(T t) void onError(Throwable t) void onCompleted()}

其中,onNext() 可能永远也不会被调用,也可能会被调用一次、多次或无数次。onError() 和 onCompleted() 是终端事件,这意味着两者只能有一个被调用,并且只能被调用一次。终端事件被调用之后,Observable 流就完成了,以后就不能再向它发送事件了。如果流是无限的,并且没有发生故障,那么终端事件可能永远不会发生。

6.1 节和 6.2 节将会展示另外一种类型签名,它支持交互式拉取。interface Producer { void request(long n)}

它可以与更加高级的 Observer 协同使用,即 Subscriber(2.3 节提供了更多的细节)。abstract class Subscriber implements Observer, Subscription { void onNext(T t) void onError(Throwable t) void onCompleted() ... void unsubscribe() void setProducer(Producer p)}

Subscription 接口中包含了 unsubcribe 函数,该函数能够允许订阅者取消对某个 Observable 流的订阅。setProducer 函数和 Producer 类型用来在生产者和消费者之间建立一个双向的通信通道,该通道用于流控制。1.3.2 异步与同步

一般而言,Observable 是异步的,但它并非总是如此。Observable 可以是同步的,事实上,它默认就是同步的。除非要求,否则 RxJava 永远不会添加并发功能。同步的 Observable 将被订阅,使用订阅者的线程发布(emit)所有数据并且完成(如果是有限 Observable 的话)。由阻塞式网络 I/O 支撑的 Observable 将会同步阻塞订阅线程,并在阻塞网络 I/O 返回时,通过 onNext() 发布数据。

例如,以下代码示例完全是同步的。Observable.create(s -> { s.onNext("Hello World!"); s.onCompleted();}).subscribe(hello -> System.out.println(hello));

2.2 节和 2.4.1 节将介绍 Observable.create 和 Observable.subscribe 的更多知识。

现在,你可能会想,这并不是反应式系统的理想行为。你是对的!将同步阻塞 I/O 与 Observable 组合使用是一种很糟糕的形式(如果确实要使用阻塞 I/O 的话,它需要以线程的方式进行异步化)。但是,有时候从内存缓存中同步获取数据并立即返回也是一种恰当的做法。前面的“Hello World”样例并不需要并发性,事实上,如果为其添加异步调度的话,它将会慢得多。因此,通常来讲,实际上重要的标准是 Observable 生成事件的过程是阻塞的还是非阻塞的,而非它是同步的还是异步的。“Hello World”样例是非阻塞的,因为它永远不会阻塞线程,所以使用 Observable 是正确的(尽管看上去有些多余)。

实际上,RxJava 的 Observable 不知道异步与同步,也不知道并发性是否存在以及来自何处。设计就是如此,这允许 Observable 的实现来决定什么做法是最好的。为什么说这是有用的呢?

首先,并发性可以来自多个地方,而不仅仅是线程池。如果数据源已经借助事件循环(event loop)实现了异步,那么 RxJava 不应该添加更多的调度开销或者强制使用特定的调度实现。并发性可以来自线程池、事件循环、Actor 等。它可能是手动添加的,也可能来源于数据源。异步性来自何处,RxJava 并不知晓。

其次,使用同步的行为有两个很好的理由,下面会进行阐述。1. 内存数据如果数据在本地内存缓存中(查询时间固定在毫秒 / 纳秒级

别),那么再花费调度成本将其异步化就没有意义了。

Observable 可以同步地获取数据,并将其发布到订阅线程上,

如下所示。

Observable.create(s -> {

s.onNext(cache.get(SOME_KEY));

s.onCompleted();

}).subscribe(value -> System.out.println(value));不清楚数据是否在内存中的时候,调度选择是很重要的。如

果数据在内存中,就采用同步的方式进行发布;如果不在内存中,

就执行异步的网络调用,并在数据到达的时候将其返回。这种选

择可以放到一个条件化的 Observable 中。

//伪代码

Observable.create(s -> {

T fromCache = getFromCache(SOME_KEY);

if(fromCache != null) {

//同步发布

s.onNext(fromCache);

s.onCompleted();

} else {

//异步抓取

getDataAsynchronously(SOME_KEY)

.onResponse(v -> {

putInCache(SOME_KEY, v);

s.onNext(v);

s.onCompleted();

})

.onFailure(exception -> {

s.onError(exception);

});

}

}).subscribe(s -> System.out.println(s));2. 同步计算(如操作符)保持同步的更常见原因是通过操作符进行流组合和转换。

RxJava 会使用大量的操作符 API 来操作、组合和转换数据,比

如 map()、filter()、take()、flatMap() 和 groupBy()。大多数这样的

操作符是同步的,这意味着在事件经过的时候,它们会在

onNext() 中执行同步计算。出于性能原因,这些操作符都是同步的。以下面的代码为例。

Observable o = Observable.create(s -> {

s.onNext(1);

s.onNext(2);

s.onNext(3);

s.onCompleted();

});

o.map(i -> "Number " + i)

.subscribe(s -> System.out.println(s));假如 map 操作符默认是异步的,(1, 2, 3) 中的每个数字都会

调度到一个线程上,在这个线程上将会执行字符串连接("Number " + i)。这是非常低效的,调度、上下文切换等一般还

会造成不确定的延迟。这里需要着重理解的就是大多数的 Observable 函数管道是

同步的(除非某个特定的操作符需要是异步的,比如 timeout 或

observeOn),而 Observable 本身可以是异步的。这些主题在

4.9.5 节和 7.1.3 节会深入介绍。如下的样例展现了同步和异步的混合使用。

Observable.create(s -> {

... async subscription and data emission ...

})

.doOnNext(i -> System.out.println(Thread.currentThread()))

.filter(i -> i % 2 == 0)

.map(i -> "Value " + i + " processed on " + Thread.currentThread())

.subscribe(s -> System.out.println("SOME VALUE =>" + s));

System.out.println("Will print BEFORE values are emitted")本例中的 Observable 是异步的(它会在与订阅者不同的线

程中发布事件),所以订阅是非阻塞的,最后的 println 的输出将

会早于事件的传播,并先于“SOME VALUE ==>”显示。但是,filter() 和 map() 函数是同步执行的,它们会在调用发

布事件的线程中执行。一般来说,这是期望的行为:实现异步的

管道(Observable 和组合操作符),让事件保持高效的同步计算。因此,Observable 类型本身支持同步和异步的具体实现,

其设计本意即是如此。1.3.3 并发与并行

单个的 Observable 流既不允许并发,也不允许并行。相反,它们是通过组合异步 Observable 来实现的。

并行(parallelism)指的是任务同时执行,通常会在不同的 CPU 或机器上。而并发(concurrency)指的是多任务的组合和交叉。如果一个 CPU 上面有多个任务的话(比如线程),它们是通过“时间分片”实现的并发,而不是并行。每个线程会得到一部分的 CPU 时间,然后即便该线程尚未完成,也要将 CPU 时间让给其他线程。

根据定义,并行执行是并发的,但是并发不一定是并行的。实际上,这意味着多线程是并发的,但是只有这些线程同时被调度到不同的 CPU 上并在其上执行时,才是并行。因此,通常我们会讨论并发性和如何并发,并行则是一种特定形式的并发。

RxJava 的 Observable 契约要求事件(onNext()、onCompleted()、onError())始终避免并发发布。换句话说,单个 Observable 流必须始终是序列化和线程安全的。每个事件可以从不同的线程中发布出来,只要发布不是并发的即可。这意味着 onNext() 没有交叉或同时执行。如果 onNext() 依然还在某个线程上执行,那么其他的线程将不能再次调用它(交叉)。

下面的样例展现了正确的代码。Observable.create(s -> { new Thread(() -> { s.onNext("one"); s.onNext("two"); s.onNext("three"); s.onNext("four"); s.onCompleted(); }).start();});

这段代码顺序地发布数据,所以它符合契约。(注意,一般不建议这样在 Observable 中启动线程,而应使用调度器,参见 4.9 节中的讨论。)

如下的样例展现了非法代码。//不要这样做Observable.create(s -> { //线程A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); //线程B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); //由于线程竞争,不需要发布s.onCompleted()});//不要这样做

这段代码是非法的,因为它的两个线程能够并发地调用 onNext() ,这破坏了契约。(同时,它还需要安全地等待两个线程都完成才能安全地调用 onComplete,如前所述,这样手动启动线程很糟糕。)

那么,该如何结合 RxJava 发挥并发和并行的优势呢?那就是组合。

单个 Observable 流始终是序列化的,但是每个 Observable 可以独立于其他 Observable 来进行操作,因此能够实现并发 / 并行。这也是 merge 和 flatMap 在 RxJava 中如此常用的原因所在——并发地将异步流组合在一起(参见 3.1.2 节和 3.2.1 节)。

如下是编造的一个例子,展示了在各自线程中运行异步 Observable 以及将其合并起来的机制。Observable a = Observable.create(s -> { new Thread(() -> { s.onNext("one"); s.onNext("two"); s.onCompleted(); }).start();});Observable b = Observable.create(s -> { new Thread(() -> { s.onNext("three"); s.onNext("four"); s.onCompleted(); }).start();});//并发订阅a和b,并将它们合并到第三个序列化流中Observable c = Observable.merge(a, b);

Observable c 将会收到来自 a 和 b 的条目,因为它们的异步性,将会发生以下三件事。● one 会出现在 two 之前。● three 会出现在 four 之前。● one/two 和 three/four 的顺序是不确定的。

那么,到底为什么不允许并发地调用 onNext() 呢?

主要原因在于 onNext() 本意是供人类使用的,并发比较困难。假如能够并发调用 onNext(),这就意味着所有 Observer 都需要为并发调用进行防御性编码,即便本来并不期望或想要这样调用。

第二个原因在于有些操作无法实现并发发布,比如 scan 和 reduce,它们是非常常见且重要的行为。像 scan 和 reduce 这样的操作符需要有顺序的事件传播,这样状态才能在事件流上累积,这些事件不能兼具组合性(associative)和可交换性(commutative)。允许并发的 Observable 流(具有并发的 onNext())将限制能够处理的事件类型,并且需要线程安全的数据结构。 Java 8 的 Stream 类型支持并发发布。这也是

java.util.stream.Stream 需要 reduce 函数具备组合性的原

因,它们必须支持在并行流上并发调用。在 java.util.stream

包的文档中,关于并行、排序(与交换性相关)、reduction

操作以及组合性的部分进一步阐述了相同 Stream 类型允许

顺序发布和并发发布的复杂性。

第三个原因在于同步开销会影响性能,因为所有的 Observer 和操作符都需要是线程安全的,即便大多数情况下数据是顺序到达的。尽管 JVM 通常擅长消除同步开销,但是并非在所有的场景中都如此(尤其是使用原子化的非阻塞算法时更是如此),最终导致顺序流上产生不必要的性能开销。

除此之外,进行一般的细粒度并行通常会更慢。并行一般需要在较粗的粒度上进行,比如批处理工作,以弥补切换线程、调度工作和重新组合的开销。如果在单个线程上同步执行,并充分利用针对有顺序的计算的内存和 CPU 优化,那么将会高效得多。对于 List 或 array 来说,为批处理并行找到合适的默认做法是很容易的,因为所有的条目都是提前可知的,并且能够划分为批处理(但是即便如此,通常来讲在一个 CPU 上处理整个列表也会更快,除非列表非常庞大或者对每个条目的处理非常耗时)。但是,流无法预先了解工作的情况,只能通过 onNext() 接收数据,因此无法自动对工作进行分块。

实际上,在 v1 版本之前,RxJava 添加过一个 .parallel(Function f) 操作符,它的行为类似于 java.util.stream.Stream.parallel(),当时认为这是非常便利的。它的实现方式并没有打破 RxJava 的契约,首先将一个 Observable 分割为多个并行执行的 Observable,随后再将它们合并到一起。但是它在该库更新到 v1 版本之前被删除了,因为它令人费解,并且总是会降低性能。为事件流添加并行计算通常都需要进行推理和测试。也许,ParallelObservable 能够提供一些帮助,为此操作符被限定在具备结合性的子集中。但是在使用 RxJava 的时代并不值得尝试,因为组合使用 merge 和 flatMap 就是针对这种用案的有效构造。

第 3 章将会讲解如何使用操作符组合 Observable,使其能够从并发和并行中受益。

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载