Spark机器学习(txt+pdf+epub+mobi电子书下载)


发布时间:2020-08-03 14:35:48

点击下载

作者:彭特里思(Nick Pentreath)

出版社:人民邮电出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

Spark机器学习

Spark机器学习试读:

前言

近年来,被收集、存储和分析的数据量呈爆炸式增长,特别是与网络、移动设备相关的数据,以及传感器产生的数据。大规模数据的存储、处理、分析和建模,以前只有Google、Yahoo!、Facebook和Twitter这样的大公司才涉及,而现在越来越多的机构都会面对处理海量数据的挑战。

面对如此量级的数据以及常见的实时利用该数据的需求,人工驱动的系统难以应对。这就催生了所谓的大数据和机器学习系统,它们从数据中学习并可自动决策。

为了能以低成本实现对大规模数据的支持,Google、Yahoo!、Amazon和Facebook涌现了大量开源技术。这些技术旨在通过在计算机集群上进行分布式数据存储和计算来简化大数据处理。

这些技术中最广为人知的是Apache Hadoop,它极大简化了海量数据的存储(通过Hadoop Distributed File System,即HDFS)和计算(通过Hadoop MapReduce,一种在集群里多个节点上进行并行计算的框架)流程,并降低了相应的成本。

然而,MapReduce有其严重的缺点,如启动任务时的高开销、对中间数据和计算结果写入磁盘的依赖。这些都使得Hadoop不适合迭代式或低延迟的任务。Apache Spark是一个新的分布式计算框架,从设计开始便注重对低延迟任务的优化,并将中间数据和结果保存在内存中。Spark提供简洁明了的函数式API,并完全兼容Hadoop生态系统。

不止如此,Spark还提供针对Scala、Java和Python语言的原生API。通过Scala和Python的API,Spark应用程序可充分利用Scala或Python语言的优势。这些优势包括使用相关的解释程序进行实时交互式的程序编写。Spark目前还自带一个分布式机器学习和数据挖掘工具包MLlib。经过重点开发,这个包中已经包括一些针对常见计算任务的高质量、可扩展的算法。本书会涉及其中的部分算法。

在大型数据集上进行机器学习颇具挑战性。这主要是因为常见的机器学习算法并非为并行架构而设计。大部分情况下,设计这样的算法并不容易。机器学习模型一般具有迭代式的特性,而这与Spark的设计目标一致。并行计算的框架有很多,但很少能在兼顾速度、可扩展性、内存处理和容错性的同时,还提供灵活、表达力丰富的API。Spark是其中为数不多的一个。

本书将关注机器学习技术的实际应用。我们会简要介绍机器学习算法的一些理论知识,但总的来说本书注重技术实践。具体来说,我们会通过示例程序和样例代码,举例说明如何借助Spark、MLlib以及其他常见的免费机器学习和数据分析套件来创建一个有用的机器学习系统。

本书内容

第1章 “Spark的环境搭建与运行”,会讲到如何安装和搭建Spark框架的本地开发环境,以及怎样使用Amazon EC2在云端创建Spark集群。之后介绍Spark编程模型和API。最后分别用Scala、Java和Python语言创建一个简单的Spark应用。

第2章 “设计机器学习系统”,会展示一个贴合实际的机器学习系统案例。随后会针对该案例设计一个基于Spark的智能系统所对应的高层架构。

第3章 “Spark上数据的获取、处理与准备”,会详细介绍如何从各种免费的公开渠道获取用于机器学习系统的数据。我们将学到如何进行数据处理和清理,并通过可用的工具、库和Spark函数将它们转换为符合要求的数据,使之具备可用于机器学习模型的特征。

第4章 “构建基于Spark的推荐引擎”,展示了如何创建一个基于协同过滤的推荐模型。该模型将用于向给定用户推荐物品,以及创建与给定物品相似的物品。这一章还会讲到如何使用标准指标来评估推荐模型的效果。

第5章 “Spark构建分类模型”,阐述如何创建二元分类模型,以及如何利用标准的性能评估指标来评估分类效果。

第6章 “Spark构建回归模型”,扩展了第5章中的分类模型以创建一个回归模型,并详细介绍回归模型的评估指标。

第7章 “Spark构建聚类模型”,探索如何创建聚类模型以及相关评估方法的使用。你会学到如何分析和可视化聚类结果。

第8章 “Spark应用于数据降维”,将通过多种方法从数据中提取其内在结构并降低其维度。你会学到一些常见的降维方法,以及如何对它们进行应用和分析。这里还会讲到如何将降维的结果作为其他机器学习模型的输入。

第9章 “Spark高级文本处理技术”,介绍处理大规模文本数据的方法。这包括从文本提取特征以及处理文本数据常见的高维特征的方法。

第10章 “Spark Streaming在实时机器学习上的应用”,对Spark Streaming进行综述,并介绍在流数据上的机器学习中它如何实现对在线和增量学习方法的支持。

预备知识

本书假设读者已有基本的Scala、Java或Python编程经验,以及机器学习、统计学和数据分析方面的基础知识。

本书目标

本书的预期读者是初中级数据科学研究者、数据分析师、软件工程师和对大规模环境下的机器学习或数据挖掘感兴趣的人。读者不需要熟悉Spark,但若具有统计、机器学习相关软件(比如MATLAB、scikit-learn、Mahout、R和Weka等)或分布式系统(如Hadoop)的实践经验,会很有帮助。

排版约定

在本书中,你会发现一些不同的文本样式,用以区别不同种类的信息。下面举例说明。

代码段的格式如下:val conf = new SparkConf().setAppName("Test Spark App").setMaster("local[4]")val sc = new SparkContext(conf)

所有的命令行输入或输出的格式如下:>tar xfvz spark-1.2.0-bin-hadoop2.4.tgz>cd spark-1.2.0-bin-hadoop2.4

新术语和重点词汇以楷体标示。屏幕、目录或对话框上的内容这样表示:“这些信息可以从AWS主页上依次点击‘Account’ | ‘Security Credentials’ | ‘Access Credentials’看到。”这个图标表示警告或需要特别注意的内容。这个图标表示提示或者技巧。

读者反馈

欢迎提出反馈。如果你对本书有任何想法,喜欢它什么,不喜欢它什么,请让我们知道。要写出真正对大家有帮助的书,了解读者的反馈很重要。

一般的反馈,请发送电子邮件至feedback@packtpub.com,并在邮件主题中包含书名。

如果你有某个主题的专业知识,并且有兴趣写成或帮助促成一本书,请参考我们的作者指南http://www.packtpub.com/authors。

客户支持

现在,你是一位自豪的Packt图书的拥有者,我们会尽全力帮你充分利用你手中的书。下载示例代码

你可以用你的账户从http://www.packtpub.com下载所有已购买Packt图书的示例代码文件。如果你从其他地方购买本书,可以访问http://www.packtpub.com/support并注册,我们将通过电子邮件把文件发送给你。勘误表

虽然我们已尽力确保本书内容正确,但出错仍旧在所难免。如果你在我们的书中发现错误,不管是文本还是代码,希望能告知我们,我们不胜感激。这样做可以减少其他读者的困扰,帮助我们改进本书的后续版本。如果你发现任何错误,请访问http://www.packtpub.com/submit-errata提交,选择你的书,点击勘误表提交表单的链接,并输入详细说明。勘误一经核实,你的提交将被接受,此勘误将上传到本公司网站或添加到现有勘误表。从http://www.packtpub.com/support选择书名就可以查看现有的勘误表。

侵权行为

互联网上的盗版是所有媒体都要面对的

问题

。Packt非常重视保护版权和许可证。如果你发现我们的作品在互联网上被非法复制,不管以什么形式,都请立即为我们提供位置地址或网站名称,以便我们可以寻求补救。

请把可疑盗版材料的链接发到copyright@packtpub.com。

非常感谢你帮助我们保护作者,以及保护我们给你带来有价值内容的能力。问题

如果你对本书内容存有疑问,不管是哪个方面,都可以通过questions@packtpub.com联系我们,我们将尽最大努力来解决。

 

致谢

过去一年里,本书的写作过程如同过山车一般跌宕起伏,伴随着熬夜和周末加班。对机器学习和Apache Spark的热爱让我受益良多,也希望本书能让读者有所收获。

非常感谢Packt出版团队在本书写作和编辑过程中提供的帮助,感谢Rebecca、Susmita、Sudhir、Amey、Neil、Vivek、Pankaj和所有为本书出过力的人。

同样感谢StumbleUpon公司的Debora Donato,她提供过数据和法律方面的协助。

写书的过程可能会让人感到孤立无援,因此审校人的反馈对保证本书的可读性,以及知晓还需要作出哪些调整十分有帮助。我深深地感谢Andrea Mostosi、Hao Ren和Krishna Sankar花费时间审阅本书,并提供细致且极为重要的反馈。

家人和朋友的不懈支持是本书得以写成的必要因素。特别是我的好妻子Tammy,感谢她在若干个夜晚和周末的陪伴与支持。谢谢你们所有人!

最后,谢谢你阅读这本书,希望它对你能有所帮助。

 第1章Spark的环境搭建与运行

Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。该框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操作都进行了抽象。它提供了一个更高级别的API用于处理分布式数据。从这方面说,它与Apache Hadoop等分布式处理框架类似。但在底层架构上,Spark与它们有所不同。

Spark起源于加利福利亚大学伯克利分校的一个研究项目。学校当时关注分布式机器学习算法的应用情况。因此,Spark从一开始便为应对迭代式应用的高性能需求而设计。在这类应用中,相同的数据会被多次访问。该设计主要靠利用数据集内存缓存以及启动任务时的低延迟和低系统开销来实现高性能。再加上其容错性、灵活的分布式数据结构和强大的函数式编程接口,Spark在各类基于机器学习和迭代分析的大规模数据处理任务上有广泛的应用,这也表明了其实用性。关于Spark项目的更多背景信息,包括其开发的核心研究论文,可从项目的历史介绍页面中查到:http://spark.apache.org/community.html#history。

Spark支持四种运行模式。● 本地单机模式:所有Spark进程都运行在同一个Java虚拟机(Java Vitural Machine,JVM)中。● 集群单机模式:使用Spark自己内置的任务调度框架。● 基于Mesos:Mesos是一个流行的开源集群计算框架。● 基于YARN:即Hadoop 2,它是一个与Hadoop关联的集群计算

和资源调度框架。

本章主要包括以下内容。● 下载Spark二进制版本并搭建一个本地单机模式下的开发环境。

各章的代码示例都在该环境下运行。● 通过Spark的交互式终端来了解它的编程模型及其API。● 分别用Scala、Java和Python语言来编写第一个Spark程序。● 在Amazon的Elastic Cloud Compute(EC2)平台上架设一个

Spark集群。相比本地模式,该集群可以应对数据量更大、计算

更复杂的任务。通过自定义脚本,Spark同样可以运行在Amazon的Elastic MapReduce服务上,但这不在本书讨论范围内。相关信息可参考http://aws.amazon.com/articles/4926593393724923;本书写作时,这篇文章是基于Spark 1.1.0写的。

如果读者曾构建过Spark环境并有Spark程序编写基础,可以跳过本章。1.1Spark的本地安装与配置

Spark能通过内置的单机集群调度器来在本地运行。此时,所有的Spark进程运行在同一个Java虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式很适合程序的原型设计、开发、调试及测试。同样,它也适应于在单机上进行多核并行计算的实际场景。

Spark的本地模式与集群模式完全兼容,本地编写和测试过的程序仅需增加少许设置便能在集群上运行。

本地构建Spark环境的第一步是下载其最新的版本包(本书写作时为1.2.0版)。各个版本的版本包及源代码的GitHub地址可从Spark项目的下载页面找到:http://spark.apache.org/downloads.html。Spark的在线文档http://spark.apache.org/docs/latest/涵盖了进一步学习Spark所需的各种资料。强烈推荐读者浏览查阅。

为了访问HDFS(Hadoop Distributed File System,Hadoop分布式文件系统)以及标准或定制的Hadoop输入源,Spark的编译需要与Hadoop的版本对应。上述下载页面提供了针对Hadoop 1、CDH4(Cloudera的Hadoop发行版)、MapR的Hadoop发行版和Hadoop 2(YARN)的预编译二进制包。除非你想构建针对特定版本Hadoop的Spark,否则建议你通过如下链接从Apache镜像下载Hadoop 2.4预编译版本:http://www.apache.org/dyn/closer.cgi/spark/spark-1.2.0/spark-1.2.0-bin-hadoop2.4.tgz。

Spark的运行依赖Scala编程语言(本书写作时为2.10.4版)。好在预编译的二进制包中已包含Scala运行环境,我们不需要另外安装Scala便可运行Spark。但是,JRE(Java运行时环境)或JDK(Java开发套件)是要安装的(相应的安装指南可参见本书代码包中的软硬件列表)。

下载完上述版本包后,解压,并在终端进入解压时新建的主目录:>tar xfvz spark-1.2.0-bin-hadoop2.4.tgz >cd spark-1.2.0-bin-hadoop2.4

用户运行Spark的脚本在该目录的bin目录下。我们可以运行Spark附带的一个示例程序来测试是否一切正常:>./bin/run-example org.apache.spark.examples.SparkPi

该命令将在本地单机模式下执行SparkPi这个示例。在该模式下,所有的Spark进程均运行于同一个JVM中,而并行处理则通过多线程来实现。默认情况下,该示例会启用与本地系统的CPU核心数目相同的线程。示例运行完,应可在输出的结尾看到类似如下的提示:…14/11/27 20:58:47 INFO SparkContext: Job finished: reduce at SparkPi.scala:35, took 0.723269s Pi is roughly 3.1465…

要在本地模式下设置并行的级别,以local[N]的格式来指定一个master变量即可。上述参数中的N表示要使用的线程数目。比如只使用两个线程时,可输入如下命令:>MASTER=local[2] ./bin/run-example org.apache.spark.examples.SparkPi1.2Spark集群

Spark集群由两类程序构成:一个驱动程序和多个执行程序。本地模式时所有的处理都运行在同一个JVM内,而在集群模式时它们通常运行在不同的节点上。

举例来说,一个采用单机模式的Spark集群(即使用Spark内置的集群管理模块)通常包括:● 一个运行Spark单机主进程和驱动程序的主节点;● 各自运行一个执行程序进程的多个工作节点。

在本书中,我们将使用Spark的本地单机模式做概念讲解和举例说明,但所用的代码也可运行在Spark集群上。比如在一个Spark单机集群上运行上述示例,只需传入主节点的URL即可:>MASTER=spark://IP:PORT ./bin/run-example org.apache.spark.examples.SparkPi

其中的IP和PORT分别是主节点IP地址和端口号。这是告诉Spark让示例程序运行在主节点所对应的集群上。

Spark集群管理和部署的完整方案不在本书的讨论范围内。但是,本章后面会对Amazon EC2集群的设置和使用做简要说明。Spark集群部署的概要介绍可参见如下链接:● http://spark.apache.org/docs/latest/cluster-overview.html● http://spark.apache.org/docs/latest/submitting-applications.html1.3Spark编程模型

在对Spark的设计进行更全面的介绍前,我们先介绍SparkContext对象以及Spark shell。后面将通过它们来了解Spark编程模型的基础知识。虽然这里会对Spark的使用进行简要介绍并提供示例,但要想了解更多,可参考下面这些资料。● Spark快速入门:http://spark.apache.org/docs/latest/quick-start.html。● 针对Scala、Java和Python的《Spark编程指南》:http://spark.apache.org/docs/latest/programming-guide.html。1.3.1 SparkContext类与SparkConf类

任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如主节点的URL)。

初始化后,我们便可用SparkContext对象所包含的各种方法来创建和操作分布式数据集和共享变量。Spark shell(在Scala和Python下可以,但不支持Java)能自动完成上述初始化。若要用Scala代码来实现的话,可参照下面的代码:val conf = new SparkConf().setAppName("Test Spark App").setMaster("local[4]")val sc = new SparkContext(conf)

这段代码会创建一个4线程的SparkContext对象,并将其相应的任务命名为Test Spark APP。我们也可通过如下方式调用SparkContext的简单构造函数,以默认的参数值来创建相应的对象。其效果和上述的完全相同:val sc = new SparkContext("local[4]", "Test Spark App")下载示例代码你可从http://www.packtpub.com下载你账号购买过的Packt书籍所对应的示例代码。若书是从别处购买的,则可在https://www.packtpub.com/books/content/support注册,相应的代码会直接发送到你的电子邮箱。1.3.2 Spark shell

Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算,shell能在输入代码时给出实时反馈。在Scala shell里,命令执行结果的值与类型在代码执行完后也会显示出来。

要想通过Scala来使用Spark shell,只需从Spark的主目录执行./bin/spark-shell。它会启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个对象。该命令的终端输出应该如下图所示:

要想在Python shell中使用Spark,直接运行./bin/pyspark命令即可。与Scala shell类似, Python下的SparkContext对象可以通过Python变量sc来调用。上述命令的终端输出应该如下图所示:1.3.3 弹性分布式数据集

RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。Spark中的RDD具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。

1. 创建RDD

RDD可从现有的集合创建。比如在Scala shell中:val collection = List("a", "b", "c", "d", "e")val rddFromCollection = sc.parallelize(collection)

RDD也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。基于Hadoop的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文件、其他Hadoop标准格式、HBase、Cassandra等。以下举例说明如何用一个本地文件系统里的文件创建RDD:val rddFromTextFile = sc.textFile("LICENSE")

上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个表示文本文件中某一行文字的String(字符串)对象。

2. Spark操作

创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。

Spark的操作通常采用函数式风格。对于那些熟悉用Scala或Python进行函数式编程的程序员来说,这不难掌握。但Spark API其实容易上手,所以那些没有函数式编程经验的程序员也不用担心。

Spark程序中最常用的转换操作便是map操作。该操作对一个RDD里的每一条记录都执行某个函数,从而将输入映射成为新的输出。比如,下面这段代码便对一个从本地文本文件创建的RDD进行操作。它对该RDD中的每一条记录都执行size函数。之前我们曾创建过一个这样的由若干String构成的RDD对象。通过map函数,我们将每一个字符串都转换为一个整数,从而返回一个由若干Int构成的RDD对象。val intsFromStringsRDD = rddFromTextFile.map(line => line.size)

其输出应与如下类似,其中也提示了RDD的类型:intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at :14

示例代码中的=>是Scala下表示匿名函数的语法。匿名函数指那些没有指定函数名的函数(比如Scala或Python中用def关键字定义的函数)。匿名函数的具体细节并不在本书讨论范围内,但由于它们在Scala、Python以及Java 8中大量使用(示例或现实应用中都是),列举一些实例仍会有帮助。语法line => line.size表示以=>操作符左边的部分作为输入,对其执行一个函数,并以=>操作符右边代码的执行结果为输出。在这个例子中,输入为line,输出则是line.size函数的执行结果。在Scala语言中,这种将一个String对象映射为一个Int的函数被表示为String => Int。该语法使得每次使用如map这种方法时,都不需要另外单独定义一个函数。当函数简单且只需使用一次时(像本例一样时),这种方式很有用。

现在我们可以调用一个常见的执行操作count,来返回RDD中的记录数目。intsFromStringsRDD.count

执行的结果应该类似如下输出:14/01/29 23:28:28 INFO SparkContext: Starting job: count at :17 ...14/01/29 23:28:28 INFO SparkContext: Job finished: count at :17, took 0.019227 sres4: Long = 398

如果要计算这个文本文件里每行字符串的平均长度,可以先使用sum函数来对所有记录的长度求和,然后再除以总的记录数目:val sumOfRecords = intsFromStringsRDD.sumval numRecords = intsFromStringsRDD.countval aveLengthOfRecord = sumOfRecords / numRecords

结果应该如下:aveLengthOfRecord: Double = 52.06030150753769

Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中,count返回一个Long,sum返回一个Double)。这就意味着多个操作可以很自然地前后连接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子相同的结果:val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count

值得注意的一点是,Spark中的转换操作是延后的。也就是说,在RDD上调用一个转换操作并不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,从而提高了Spark的效率。

这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:val transformedRDD = rddFromTextFile.map(line => line.size).filter(size => size > 10).map(size => size * 2)

相应的终端输出如下:transformedRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[8] at map at :14

注意,这里实际上没有触发任何计算,也没有结果被返回。如果我们现在在新的RDD上调用一个执行操作,比如sum,该计算将会被触发:val computation = transformedRDD.sum

现在你可以看到一个Spark任务被启动,并返回如下终端输出:...14/11/27 21:48:21 INFO SparkContext: Job finished: sum at :16,took 0.193513 scomputation: Double = 60468.0RDD支持的转换和执行操作的完整列表以及更为详细的例子,参见《Spark编程指南》(http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations)以及Spark API(Scala)文档(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD)。

3. RDD缓存策略

Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache函数来实现:rddFromTextFile.cache

调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。

如果现在在已缓存了的RDD上调用count或sum函数,应该可以感觉到RDD的确已经载入到了内存中:val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count

实际上,从下方的输出我们可以看到,数据在第一次调用cache时便已缓存到内存,并占用了大约62 KB的空间,余下270 MB可用:...14/01/30 06:59:27 INFO MemoryStore: ensureFreeSpace(63454) called with curMem=32960, maxMem=31138775014/01/30 06:59:27 INFO MemoryStore: Block rdd_2_0 stored as values to memory (estimated size 62.0 KB, free 296.9 MB)14/01/30 06:59:27 INFO BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_0 in memory on 10.0.0.3:55089 (size: 62.0 KB, free: 296.9 MB)...

现在,我们再次求平均长度:val aveLengthOfRecordChainedFromCached = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count

从如下的输出中应该可以看出缓存的数据是从内存直接读出的:...14/01/30 06:59:34 INFO BlockManager: Found block rdd_2_0 locally...Spark支持更为细化的缓存策略。通过persist函数可以指定Spark的数据缓存策略。关于RDD缓存的更多信息可参见:http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。1.3.4 广播变量和累加器

Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。

广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序创建后发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即可:val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))

终端的输出表明,广播变量存储在内存中,占用的空间大概是488字节,仍余下270 MB可用空间:14/01/30 07:13:32 INFO MemoryStore: ensureFreeSpace(488) called with curMem=96414, maxMem=31138775014/01/30 07:13:32 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 488.0 B, free 296.9 MB)broadCastAList: org.apache.spark.broadcast.Broadcast[List[String]] = Broadcast(1)

广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的value方法:sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++ x).collect

这段代码会从{"1", "2", "3"}这个集合(一个Scala List)里,新建一个带有三条记录的RDD。map函数里的代码会返回一个新的List对象。这个对象里的记录由之前创建的那个broadcastAList里的记录与新建的RDD里的三条记录分别拼接而成。

注意,上述代码使用了collect函数。这个函数是一个Spark执行函数,它将整个RDD以Scala(Python或Java)集合的形式返回驱动程序。

通常只在需将结果返回到驱动程序所在节点以供本地处理时,才调用collect函数。注意,collect函数一般仅在的确需要将整个结果集返回驱动程序并进行后续处理时才有必要调用。如果在一个非常大的数据集上调用该函数,可能耗尽驱动程序的可用内存,进而导致程序崩溃。高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便属于这类情况。

从如下结果可以看出,新生成的RDD里包含3条记录,其每一条记录包含一个由原来被广播的List变量附加一个新的元素所构成的新记录(也就是说,新记录分别以1、2、3结尾)。...14/01/31 10:15:39 INFO SparkContext: Job finished: collect at :15, took 0.025806 sres6: Array[List[Any]] = Array(List(a, b, c, d, e, 1), List(a, b, c, d, e, 2), List(a, b, c, d, e, 3))

累加器(accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不同,是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算以及返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只允许驱动程序访问。累加器同样可以在Spark代码中通过value访问。关于累加器的更多信息,可参见《Spark编程指南》:http://spark.apache.org/docs/latest/programming-guide.html#shared-variables。1.4Spark Scala编程入门

下面我们用上一节所提到的内容来编写一个简单的Spark数据处理程序。该程序将依次用Scala、Java和Python三种语言来编写。所用数据是客户在我们在线商店的商品购买记录。该数据存在一个CSV文件中,名为UserPurchaseHistory.csv,内容如下所示。文件的每一行对应一条购买记录,从左到右的各列值依次为客户名称、商品名以及商品价格。John,iPhone Cover,9.99John,Headphones,5.49Jack,iPhone Cover,9.99Jill,Samsung Galaxy Cover,8.95Bob,iPad Cover,5.49

对于Scala程序而言,需要创建两个文件:Scala代码文件以及项目的构建配置文件。项目将使用SBT(Scala Build Tool,Scala构建工具)来构建。为便于理解,建议读者下载示例代码scala-spark-app。该资源里的data目录下包含了上述CSV文件。运行这个示例项目需要系统中已经安装好SBT(编写本书时所使用的版本为0.13.1)。配置SBT并不在本书讨论范围内,但读者可以从http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html找到更多信息。

我们的SBT配置文件是build.sbt,其内容如下面所示(注意,各行代码之间的空行是必需的):name := "scala-spark-app"version := "1.0"scalaVersion := "2.10.4"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0 "

最后一行代码是添加Spark到本项目的依赖库。

相应的Scala程序在ScalaApp.scala这个文件里。接下来我们会逐一讲解代码的各个部分。首先,导入所需要的Spark类:import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._ /** * 用Scala编写的一个简单的Spark应用 */object ScalaApp {

在主函数里,我们要初始化所需的SparkContext对象,并且用它通过textFile函数来访问CSV数据文件。之后对每一行原始字符串以逗号为分隔符进行分割,提取出相应的用户名、产品和价格信息,从而完成对原始文本的映射:def main(args: Array[String]) { val sc = new SparkContext("local[2]", "First Spark App") // 将CSV格式的原始数据转化为(user,product,price)格式的记录集 val data = sc.textFile("data/UserPurchaseHistory.csv") .map(line => line.split(",")) .map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2)))

现在,我们有了一个RDD,其每条记录都由(user, product, price)三个字段构成。我们可以对商店计算如下指标:● 购买总次数● 客户总个数● 总收入● 最畅销的产品

计算方法如下:// 求购买次数val numPurchases = data.count()// 求有多少个不同客户购买过商品val uniqueUsers = data.map{ case (user, product, price) => user }.distinct().count()// 求和得出总收入val totalRevenue = data.map{ case (user, product, price) => price.toDouble }.sum()// 求最畅销的产品是什么val productsByPopularity = data .map{ case (user, product, price) => (product, 1) } .reduceByKey(_ + _) .collect() .sortBy(-_._2)val mostPopular = productsByPopularity(0)

最后那段计算最畅销产品的代码演示了如何进行Map/Reduce模式的计算,该模式随Hadoop而流行。第一步,我们将(user, product, price)格式的记录映射为(product, 1)格式。然后,我们执行一个reduceByKey操作,它会对各个产品的1值进行求和。

转换后的RDD包含各个商品的购买次数。有了这个RDD后,我们可以调用collect函数,这会将其计算结果以Scala集合的形式返回驱动程序。之后在驱动程序的本地对这些记录按照购买次数进行排序。(注意,在实际处理大量数据时,我们通常通过sortByKey这类操作来对其进行并行排序。)

最后,可在终端上打印出计算结果: println("Total purchases: " + numPurchases) println("Unique users: " + uniqueUsers) println("Total revenue: " + totalRevenue) println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2)) }}

可以在项目的主目录下执行sbt run命令来运行这个程序。如果你使用了IDE的话,也可以从Scala IDE直接运行。最终的输出应该与下面的内容相似:...[info] Compiling 1 Scala source to ...[info] Running ScalaApp...14/01/30 10:54:40 INFO spark.SparkContext: Job finished: collect atScalaApp.scala:25, took 0.045181 sTotal purchases: 5Unique users: 4Total revenue: 39.91Most popular product: iPhone Cover with 2 purchases

可以看到,商店总共有4个客户的5次交易,总收入为39.91。最畅销的商品是iPhone Cover,共购买2次。1.5Spark Java编程入门

Java API与Scala API本质上很相似。Scala代码可以很方便地调用Java代码,但某些Scala代码却无法在Java里调用,特别是那些使用了隐式类型转换、默认参数和采用了某些Scala反射机制的代码。

一般来说,这些特性在Scala程序中会被广泛使用。这就有必要另外为那些常见的类编写相应的Java版本。由此,SparkContext有了对应的Java版本JavaSparkContext,而RDD则对应JavaRDD。

1.8及之前版本的Java并不支持匿名函数,在函数式编程上也没有严格的语法规范。于是,套用到Spark的Java API上的函数必须要实现一个带有call函数的WrappedFunction接口。这会使得代码冗长,所以我们经常会创建临时类来传递给Spark操作。这些类会实现操作所需的接口以及call函数,以取得和用Scala编写时相同的效果。

Spark提供对Java 8匿名函数(lambda)语法的支持。使用该语法能让Java 8书写的代码看上去很像等效的Scala版。

用Scala编写时,键/值对记录的RDD能支持一些特别的操作(比如reduceByKey和saveAsSequenceFile)。这些操作可以通过隐式类型转换而自动被调用。用Java编写时,则需要特别类型的JavaRDD来支持这些操作。它们包括用于键/值对的JavaPairRDD,以及用于数值记录的JavaDoubleRDD。我们在这里只涉及标准的Java API语法。关于Java下支持的RDD以及Java 8 lambda表达式支持的更多信息可参见《Spark编程指南》:http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations。

在后面的Java程序中,我们可以看到大部分差异。这些示例代码包含在本章示例代码的java-spark-app目录下。该目录的data子目录下也包含上述CSV数据。

这里会使用Maven构建工具来编译和运行这个项目。我们假设读者已经在其系统上安装好了该工具。Maven的安装和配置并不在本书讨论范围内。通常它可通过Linux系统中的软件管理器或Mac OS X中的HomeBrew或MacPorts方便地安装。详细的安装指南参见:http://maven.apache.org/download.cgi。

项目中包含一个名为JavaApp.java的Java源文件:import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.DoubleFunction;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Collections;import java.util.Comparator;import java.util.List;/** * 用Java编写的一个简单的Spark应用 */public class JavaApp { public static void main(String[] args) {

正如在Scala项目中一样,我们首先需要初始化一个上下文对象。值得注意的是,这里所使用的是JavaSparkContext类而不是之前的SparkContext。类似地,调用JavaSparkContext对象,利用textFile函数来访问数据,然后将各行输入分割成多个字段。请注意下面代码的高亮部分是如何使用匿名类来定义一个分割函数的。该函数确定了如何对各行字符串进行分割。JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");// 将CSV格式的原始数据转化为(user,product,price)格式的记录集JavaRDD data = sc.textFile("data/UserPurchaseHistory.csv").map(new Function() { @Override public String[] call(String s) throws Exception { return s.split(","); }});

现在可以算一下用Scala时计算过的指标。这里有两点值得注意的地方,一是下面Java API中有些函数(比如distinct和count)实际上和在Scala API中一样,二是我们定义了一个匿名类并将其传给map函数。匿名类的定义方式可参见代码的高亮部分。// 求总购买次数long numPurchases = data.count();// 求有多少个不同客户购买过商品long uniqueUsers = data.map(new Function() { @Override public String call(String[] strings) throws Exception { return strings[0]; }}).distinct().count();// 求和得出总收入double totalRevenue = data.map(new DoubleFunction(){ @Override public Double call(String[] strings) throws Exception { return Double.parseDouble(strings[2]); }}).sum();

下面的代码展现了如何求出最畅销的产品,其步骤与Scala示例的相同。多出的那些代码看似复杂,但它们大多与Java中创建匿名函数有关,实际功能与用Scala时一样: // 求最畅销的产品是哪个 // 首先用一个PairFunction和Tuple2类将数据映射成为(product,1)格式的记录 // 然后,用一个Function2类来调用reduceByKey操作,该操作实际上是一个求和函数 List> pairs = data.map(new PairFunction() { @Override

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载