Spark Streaming实时流式大数据处理实战(txt+pdf+epub+mobi电子书下载)


发布时间:2020-07-01 06:56:09

点击下载

作者:肖力涛

出版社:机械工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

Spark Streaming实时流式大数据处理实战

Spark Streaming实时流式大数据处理实战试读:

前言

为什么要写这本书?

对于计算机从业人员和在校大学生而言,多少都会接触到数据处理,如简单的信息管理系统和利用关系型数据库设计的存储系统等,这类系统通常涉及的数据量比较小。而随着互联网的发展,企业内部的数据量也呈现爆炸式增长,随之而来的大数据处理就会是一件非常棘手的事情。所以近年来随着大数据概念的火爆,也涌现出了越来越多的大数据处理平台,如Hadoop、Hive、HBase、Flume、Kafka、Storm和Spark等,让人眼花缭乱。开发人员需要针对具体的场景和任务特点,选择合适的工具,将它们组合起来以完成任务。

本书围绕大数据处理领域应用最广泛的Spark平台展开讲解,并对时下比较热门的大数据平台都有所介绍,以此为基础重点切入流式大数据处理这个比较垂直和常用的领域,对Spark Streaming、Kafka和ZooKeeper等大数据处理工具进行介绍,并给出多个实战案例,让读者能够从零到一学习如何构建一个大数据处理任务,掌握如何选择合适的处理工具,以及学习编程中一些常见的技巧。

本书特色

1.内容丰富,讲解详细

本书对大数据的相关知识体系做了详细阐述,并对Spark平台和Spark Streaming及其涉及的大数据平台做了重点阐述,以方便读者掌握常用的大数据架构平台。

2.原理分析与应用实践并重

本书对涉及的知识点详细地阐述了其背后的基本原理,并给出了大量的应用实践,便于读者更加透彻地理解所学知识,从而在调优和排查问题等具体实践时更加得心应手。

3.详解大量的应用实例和实战案例

本书中的每个章节都安排了实例,以方便读者动手演练。另外,第8~10章还给出了3个实战案例,以帮助读者提高实际的项目开发水平。这些案例改写自笔者和同事在工作中的真实应用案例,有较高的实用价值,读者在实践中可以进行借鉴。

4.提供详细的源代码

笔者对书中涉及的所有源代码都进行了整理并开源,供读者下载使用。读者可以对这些代码稍加修改,即可用于自己的项目中。

本书内容

第1篇 Spark基础(第1~3章)

本篇重点围绕Spark平台进行讲解,并具体就如何搭建一个自己的Spark集群进行了详细介绍,为后面的实战演练打下基础。

第1章初识Spark,从Spark的历史发展出发,重点介绍了流式处理任务,对比了不同流式处理框架,并介绍了Spark Streaming的特点。

第2章Spark运行与开发环境,主要介绍了如何搭建Spark集群,以及如何从零到一开始开发Spark应用程序,最后对从文件中进行词频统计的Spark应用做了介绍。

第3章Spark编程模型,对Spark的核心编程模型做了详细讲解,这对于开发Spark应用及Spark Streaming应用优化来说都是必要的。另外,本章还对RDD的各种操作做了讲解。

第2篇 Spark Streaming详解(第4~7章)

本篇重点阐述了Spark Streaming的编程模型和特点,并将一些常用的大数据平台与Spark Streaming相结合进行讲解,最后对Spark Streaming应用中常见的调优实践进行了总结。

第4章Spark Streaming编程模型及原理,着重介绍了Spark Streaming的运行原理,并且讲解了Spark Streaming应用开发的必要知识,最后以Spark Streaming接收网络输入流并进行词频统计进行实例演练。

第5章Spark Streaming与Kafka,重点介绍了Spark Streaming与Kafka配合使用的相关知识点,并介绍了在部署时常见的ZooKeeper平台,最后利用Kafka作为Spark Streaming的输入源进行分析操作。

第6章Spark Streaming与外部存储介质,主要介绍了流式处理任务中如何将处理结果输出到外部存储介质等相关知识。本章就一些常用的数据库与Spark Streaming结合进行讲解,最后结合日志分析实例,将日志文件分析后输出到MySQL中,可以让读者了解整个流程。

第7章Spark Streaming调优实践,介绍了在实际生产中如何根据具体的数据量和任务情况对Spark Streaming进行优化修改,并且以一个具体的项目调优实例讲解调优的分析过程。

第3篇 Spark Streaming案例实战(第8~10章)

经过前两篇的学习,读者应该已经掌握了Spark和Spark Streaming的基本原理及开发技术。本篇在此基础上进行实战演练,带领读者完成3个大数据项目实战案例。

第8章实时词频统计处理系统实战,针对文本数据常见的流式处理任务,通过一个实战案例,对词频统计从设计、实现到部署的相关知识进行了详细讲解。

第9章用户行为统计实战,通过一个实战案例,介绍了在广告行为分析和推荐系统中如何对用户行为进行统计分析。

第10章监控报警系统实战,对监控报警系统提出了一种架构上的设计思路,即以Kafka为数据总线串联,利用爬虫技术爬取数据,再用Spark Streaming进行过滤处理和后续的归纳汇总报警。

附录A Scala语言基础,对本书在讲解时所采用的Spark源生语言Scala的基础知识做了简单讲解,用于帮助对Scala还不是很熟悉的读者。

本书读者对象

阅读本书需要读者有一定的编程经验,建议读者最好对Java和C++等面向对象编程语言有一定的了解。具体而言,本书主要适合以下读者阅读:·有一定编程基础的Spark初学者;·了解Spark,想进一步使用Spark Streaming的从业人员;·流式大数据处理程序员;·对Spark和Spark Streaming感兴趣的程序员;·高校相关专业的学生;·大数据技术培训机构的学员。

本书阅读建议·基础相对薄弱的读者,可以先从附录开始了解Scala语言的特性,然后从第1章顺次阅读本书。·Java基础良好的读者可以直接顺次阅读本书,阅读中涉及的Scala语言特性可在附录中查阅。·对Spark有所了解的读者可以直接从本书第2篇开始阅读,即从本书第4章开始阅读。·对Spark及Spark Streaming比较熟悉的读者,可以直接动手演练本书第3篇中的3个实战案例。·学习时一定要亲自动手编写代码进行实践,再结合实际场景才能更好地掌握相关技术。

本书配套资源

本书涉及的所有源代码都已经开源并提供在了GitHub上,读者可以根据自己的需要进行下载,下载地址为https://github.com/xlturing/spark-streaming-action。另外,读者也可以登录华章公司的网站www.hzbook.com,在该网站上搜索到本书,然后单击“资料下载”按钮,再单击页面上的“配书资源”链接进行下载。

读者反馈

由于笔者水平所限,书中可能还存在一些疏漏,敬请读者指正,笔者会及时进行调整和修改。联系我们可通过电子邮箱litaoxiao@gmail.com或hzbook2017@163.com。笔者会将一些反馈信息整理在博客中(http://www.cnblogs.com/xlturing)。另外也欢迎读者关注笔者的微信公众号pang tao1027/互联网技术猿,笔者会定期分享一些技术文章。

致谢

感谢洪福兴在第9章内容上给予笔者的宝贵意见!

感谢在腾讯工作期间,辛愿、李铮、刘绩刚和方亮等人给予笔者的指导与帮助!

感谢本书编辑在本书出版过程中给予笔者的大力支持与帮助!

最后感谢我的家人在写书上给予我的理解与支持,在遇到挫折和困难时,我的家人都坚定地支持着我。爱你们!肖力涛第1篇Spark基础·第1章 初识Spark·第2章 Spark运行与开发环境·第3章 Spark编程模型第1章 初识Spark

笔者目前正在使用微软的Word进行书籍的撰写。而Word中一个很好用的功能便是拼写检查,当发生拼写错误时,会提供一个列表让我们选择。而背后的原理就是Word使用了一份庞大的词典来进行匹配,类似于专家的人工匹配行为。

而另一种思路是借用群体智慧,我们在使用谷歌浏览器的时候,会遇到一个“你是不是找”的功能模块,当输入比较“冷门”的搜索条件时,谷歌浏览器会给出一个更加准确的搜索条件,如图1.1所示。图1.1 谷歌搜索拼写纠正

谷歌就是利用了大数据,当我们输错一个词的时候,在每日海量的搜索数据中,一定有跟我们搜索相同内容的用户,他们会重新输入,那么这个重新输入的词也许就是我们想要的词;而另一方面,如果用户单击了该词,说明匹配正确,这样反馈学习的机制能够更好地提高拼写纠正的准确性。

类似的场景已经融入了人们生活中的方方面面,例如淘宝购物,平台会根据用户的购买行为记录推荐用户可能感兴趣的商品;看新闻,App根据用户个人行为记录及群体的观看记录,向用户推荐热点新闻和用户感兴趣的新闻;社交平台,根据用户大量的记录构建用户画像,进行更加精准的广告投放。

互联网时代,社交网络、电子商务与移动通信将我们的社会推向了一个以PB(1024TB)为单位的结构与非结构数据的新大数据时代。而面对海量的数据我们需要以更加高效的方式进行挖掘与应用,这就提出了大数据处理的需求。前几年随着Hadoop的兴起,大数据处理一时风起云涌,如图1.2展示了一个大数据平台的全景。

Data一词源于拉丁语,其本意是要对未来进行预测。这也正反映了大数据处理的核心任务——预测。大数据平台给我们提供了面对海量数据进行挖掘的能力,将其转化为生产力并产生价值。图1.2 大数据平台全景

也许有读者不禁会问:既然Hadoop平台中Map/Reduce框架的提出在很大程度上解决了大数据的处理问题,那么为什么还会诞生Spark呢?本节就来介绍Spark的由来。1.1 Spark由来

Spark最早源于一篇论文Resilient Distributed Datasets:A Fault-Tolerant Abstraction for In-Memory Cluster Computing。该论文是由加州大学柏克莱分校的Matei Zaharia等人发表的。论文中提出了一种弹性分布式数据集(即RDD)的概念,原文开头对其的解释是:

A distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.

翻译过来就是:RDD是一种分布式内存抽象,其使得程序员能够在大规模集群中做内存运算,并且有一定的容错方式。而这也是整个Spark的核心数据结构,Spark整个平台都围绕着RDD进行。之后加州大学柏克莱分校AMPLab将其开发出来。

Apache Spark是一种针对大规模数据处理的快速通用开源引擎,主要有以下特点。·速度快:由于Apache Spark支持内存计算,并且通过DAG(有向无环图)执行引擎支持无环数据流,所以官方宣称其在内存中的运算速度要比Hadoop的MapReduce快100倍,在硬盘中要快10倍,如图1.3所示。图1.3 逻辑回归在Hadoop和Spark中运算速度对比·易于使用:截至笔者完稿时,Spark的版本已经更新到Spark 2.3.1,支持了包括Java、Scala、Python和R语言在内的多种语言。·通用性强:在Spark的基础上,Spark还提供了包括Spark SQL、Spark Streaming、MLib及GraphX在内的多个工具库,我们可以在一个应用中无缝地使用这些工具库。其中,Spark SQL提供了结构化的数据处理方式,Spark Streaming主要针对流式处理任务(也是本书的重点),MLib提供了很多有用的机器学习算法库,GraphX提供图形和图形并行化计算,如图1.4所示。图1.4 Spark及其工具库·运行方式:Spark支持多种运行方式,包括在Hadoop和Mesos上,也支持Standalone的独立运行模式,同时也可以运行在云上。另外对于数据源而言,Spark支持从HDFS、HBase、Cassandra及Kafka等多种途径获取数据。

Spark内部引入了一种称为弹性分布式数据集的结构(RDD),在数据结构之间利用有向无环图(DAG)进行数据结构间变化的记录,这样可以方便地将公共的数据共享,并且当数据发生丢失时,可以依靠这种继承结构(血统Lineage)进行数据重建,具有很强的容错性。1.2 流式处理与Spark Streaming

在很多实时数据处理的场景中,都需要用到流式处理框架,Spark也包含了一个完整的流式处理框架Spark Streaming。本节我们先阐述流式处理框架,之后介绍Spark Streaming。1.2.1 流式处理框架

在传统的数据处理过程中,我们往往先将数据存入数据库中,当需要的时候再去数据库中进行检索查询,将处理的结果返回给请求的用户;另外,MapReduce这类大数据处理框架,更多应用在离线计算场景中。而对于一些实时性要求较高的场景,我们期望延迟在秒甚至毫秒级别,就需要引出一种新的数据计算结构——流式计算,对无边界的数据进行连续不断的处理、聚合和分析。

流式处理任务是大数据处理中很重要的一个分支,关于流式计算的框架也有很多,如比较出名的Storm流式处理框架,是由Nathan Marz等人于2010年最先开发,之后将Storm开源,成为Apache的顶级项目,Trident对Storm进行了一个更高层次的抽象;另外由LinkedIn贡献给社区的Samza也是一种流处理解决方案,不过其构建严重依赖于另一个开源项目Kafka。

Spark Streaming构建在Spark的基础之上,随着Spark的发展,Spark Streaming也受到了越来越多的关注。

不同的流式处理框架有不同的特点,也适应不同的场景。

1.处理模式

对于流式处理框架而言,有两种完全不同的处理模式。一种是原生流处理(Native)的方式,即所有输入记录会一条接一条地被处理,上面我们提到的Storm和Samza都是采用这种方式。

另外一种是微批处理(Batch)的方式,将输入的数据以某一时间间隔T,切分成多个微批量数据,然后对每个批量数据进行处理,Spark Streaming和Trident采用的是这种方式。两种处理模式的区别,如图1.5所示。图1.5 流式处理框架的不同处理模式

2.消息传输保障

一般有3种消息传输保障,分别是At most once、At least once和Exactly once。At most once表示每条消息传输的次数为0次或者1次,即消息可能会丢失;At least once表示每条消息传输的次数大于等于1次,即消息传输可能重复传输但保证不会丢失;Exactly once表示每条消息只会精确地传递1次,即消息传输过程中既不会丢失,也不会重复。Storm和Samza保证了At least once,而Spark Streaming和Trident保证了Exactly once。

3.容错机制

在企业的日常生产环境中,流式处理发生中断出错的现象是常有的情况,可能是发生在网络部分、某个节点宕机或程序异常等。

因此流式处理框架应该具备容错能力,当发生错误导致任务中断后,应该能够恢复到之前成功的状态重新消费。Storm和Trident是利用记录确认机制(Record ACKs)来提供容错功能,Samza采用了基于日志的容错方式,而Spark Streaming则采用了基于RDD Checkpoint的方式进行容错。

4.性能

流式处理框架自然要关注一些性能指标,从而了解不同框架的特点,包括延迟时间(Latency)和吞吐量(Throughput)等指标。Storm和Samza采用了逐条处理记录的方式,其延迟时间很低,其中Storm在实时性方面表现更加优异;而Spark Streaming和Trident采用了微批处理的方式,所以其延迟时间较高。另一方面,在吞吐量上,Spark Streaming和Samza的表现要优于Storm和Trident。

通过流式处理框架的介绍,以及和不同流式处理框架的比较,我们了解了Spark Streaming作为新兴的流式处理框架的特点,下面对Spark Streaming做一些更加详细的介绍。1.2.2 Spark Streaming初识

在Spark的核心基础上,Spark Streaming是一个高吞吐、高容错的实时流处理系统。Spark Streaming可以从Kafka、Flume、Kinesis或者TCP套接字获取数据,然后利用复杂的操作(如map、reduce、window等)对其进行处理,最终将处理后的数据输出到文件系统、数据库或者控制台等,输入与输出的过程如图1.6所示。图1.6 Spark Streaming输入和输出过程

实际上,Spark Streaming在接收到实时的流数据时,会将其按照批数据(batch)来处理,之后再对批数据进行操作得到最终的结果数据流,如图1.7所示。图1.7 Spark批数据处理过程

可以看出,Spark Streaming在内部采用了上述介绍流式处理框架时提到的微批处理的处理模式,而非直接对原始数据流进行处理。Spark Streaming对这种处理方式做了一个更高层的抽象,将原始的连续的数据流抽象后得到的多个批处理数据(batches)抽象为离散数据流(discretized stream),即DStream。DStream本身有两种产生方式:一是从Kafka、Flume或者Kinesis等输入数据流上直接创建;二是对其他DStream采用高阶API操作之后得到(如map、flatMap等)。在其内部,DStream本质是由RDD数据结构的序列来表示的,关于RDD我们会在后续3.6节中进一步说明。1.2.3 Structed Streaming简述

Spark在2.0版之后加入了一种新的流式处理模式,即结构化流式处理(Structed Streaming)。不同于Spark Streaming是以RDD构成的DStream为处理结构,结构化流是一种基于Spark SQL引擎的可扩展且容错的流处理引擎。

我们可以像表达静态数据的批处理计算一样表达流式计算。Spark SQL引擎将负责让语句按顺序地执行,并根据接收到的数据持续更新最终结果。与Spark Streaming类似,结构化流也提供了包括Scala、Java、Python及R在内的完善的API机制,并且通过检查点保证端到端的一次性容错。

Structed Streaming与Spark Streaming类似,是一种微批处理的实时流处理系统,也就是说内部并不是逐条处理数据记录,而是按照一个个小batch来处理,从而实现低至100毫秒的端到端延迟和一次性容错保证。不过在最新的Spark 2.3以后,提供了更加低延迟的处理模式,能够低至1毫秒的端到端延迟,这是与Spark Streaming的区别。

由于本书主要介绍Spark Streaming,因此这里不再展开,读者只需要了解Spark还有一种流式处理模式(在6.6节的日志分析实例中,将结合Spark Streaming和Spark SQL对日志信息进行分析处理和输出),也可视为一种结构化的处理方案,读者可以尝试用Structed Streaming处理这类数据。1.3 本章小结·Spark的核心数据结构是RDD,即弹性分布式数据集。·Spark Streaming采用微批处理模式,保证消息传输的精准性,采用checkpoint作为容错机制,具有良好的吞吐性能,延时表现并非真正实时。·Spark Streaming可以接收Kafka和HDFS等在内的多种数据源,经过批数据处理,输出到HDFS和数据库等。·Structed Streaming是Spark 2.0之后引入的结构化数据流,不同域的Spark Streaming以RDD为基础,而Structed Streaming更多以Spark SQL为基础,并且能够做到更低的延迟,希望读者能掌握本章内容,在实际项目中针对具体应用场景进行选择。第2章 Spark运行与开发环境“工欲善其事,必先利其器”。我们想利用Spark对海量的数据进行挖掘、预测,必须先对Spark的运行开发环境有一个整体的部署。本章就对Spark的部署与安装,以及Spark开发环境的搭建进行介绍。2.1 Spark的下载与安装

2014年5月30日,Spark正式将版本号定为1.0.0,成为一个成熟的大数据处理框架。至今,Spark已经迭代了很多版本,而截止到笔者完稿时,最新的Spark版本是Spark 2.3.2。在整个书籍的书写过程中,Spark从2.2更新到了2.3,所以在书中的阐述会有版本上的变化,但是这两个版本间的差异并不大,不影响我们的学习。

值得注意的是,2016年7月26日发布的Spark 2.0.0是一个大版本的更新,很多的API接口和底层的实现细节都做出了一些优化,即Spark 1.6之后,版本跳到了2.0,一些接口是不兼容的,这点希望读者在使用Spark的时候多加留意。

要安装Spark,首先来到Spark的官方下载界面,如图2.1所示。图2.1 Spark官方下载页面

在官方提供的下载页面中,首先选择Spark的版本号,目前提供的版本号最早到1.4.0,更早的版本在官网已经不提供下载了;然后根据自己的需要选择Hadoop的版本,再选择一个适合的镜像库来下载Spark;最后单击Download按钮等待下载完成。

需要注意的是,这样下载的Spark是已经编译好的Spark,可以直接使用。如果需要对源码进行修改,可自行编译,也可以从Git上把源代码复制下来:# 复制Master主分支git clone git://github.com/apache/spark.git# 下载指定的稳定版本Sparkgit clone git://github.com/apache/spark.git -b branch-2.1

在进入安装环境前,笔者自己的计算机运行环境和版本说明如下。·操作系统:Mac OS 10.12.6;·Java版本:1.8;·Spark版本:2.2;·Scala版本:2.11~2.12。

通过官网下载得到的文件为spark-2.2.0-bin-hadoop2.7.tgz,将压缩包解压到指定目录,其目录结构如下:$ ls spark-2.2.0-bin-hadoop2.7/LICENSE R RELEASE conf examples licenses python workNOTICE README.md bin data jars logs sbin yarn

其中,我们主要关注conf和sbin目录。conf目录就是配置文件所在的目录,sbin目录包含了Spark集群操作的大多数命令。至此,我们已将Spark下载到本地,在启动Spark前,还需要了解Spark的运行模式,并对Spark做一些基本的配置。2.2 Spark运行模式

Spark提供了4种模式,分别是本地模式、Standalone模式、Spark On Yarn模式及Spark On Mesos模式。其中,本地模式包含了单机模式和单机伪集群模式,用于基本的调试与实验,而另外3种模式都是基于不同资源调配的集群模式,一般是生产环境中搭建的分布式集群。

为了更清楚地讲解Spark中不同的运行模式,我们先对Spark集群的运作方式从整体上进行一个介绍,其中需要清楚几个关键的概念,如图2.2所示。

图2.2中给出了应用程序在Spark集群中运行时涉及的相关概念。·Application:提交到Spark集群的应用程序,简称App。·Driver:执行应用程序中创建SparkContext的main函数的进程,一般在集群的任何节点向集群提交应用程序,就可以将该节点称做Driver节点。·Cluster manager:即集群管理器,作为Spark集群的“神经中枢”,统筹管理Spark集群的各种资源,包括CPU和内存等,并分配不同服务所需的资源(例如standalone manager即Master、Mesos和Yarn)。·Master节点:即部署Cluster manager的节点,是一个物理层的概念。·Worker:任何在集群中运行应用程序的节点,其接收集群管理器的调度安排,为应用程序分配必需的资源,生成Executor,起到桥梁作用。·Slave节点:即部署Worker的机器节点,每个Slave节点可以有多个Worker进程,是一个物理层的概念。·Executor:表示应用在Worker节点中进行实际计算的继承,进程会接收切分好的Task任务,并将结果缓存在节点内存和磁盘上。·Task:被分配到各个Executor的单位工作内容,它是Spark中的最小执行单位,一般来说有多少个Paritition(物理层面的概念,即分支可以理解为将数据划分成不同部分并行处理),就会有多少个Task,每个Task只会处理单一分支上的数据。·Job:由多个Task的并行计算部分,一般Spark中的action操作(如save、collect,下一章会进一步说明),会生成一个Job。·Stage:是Job的组成单位,一个Job会切分成多个Stage,Stage彼此之间相互依赖顺序执行,而每个Stage是多个Task的集合,类似map和reduce stage。图2.2 Spark的不同组件

下面用一个例子来解释Spark应用的过程,以及其中概念的对应关系。假设我们需要做如下所述的事情,如图2.3所示(将在第3章介绍RDD的各类操作,然后在3.8节用一个小实例实现这个例子,读者在学习第3章后可以动手实践该例子)。(1)将一个包含人名和地址的文件加载到RDD1中。(2)将一个包含人名和电话的文件加载到RDD2中。(3)通过name来拼接(join)RDD1和RDD2,生成RDD3。(4)在RDD3上做映射(map),给每个人生成一个HTML展示卡作为RDD4。(5)将RDD4保存到文件中。(6)在RDD1上做映射(map),从每个地址中提取邮编,结果生成RDD5。(7)在RDD5上做聚合,计算出每个邮编地区中生活的人数,结果生成RDD6。(8)收集(collect)RDD6,并且将这些统计结果输出到stdout。图2.3 以概念解释例子

图2.3中第①、②、⑤、⑧步涉及输入和输出操作,其余是对RDD的操作(数字①~⑧对应上面所述的8个步骤)。以这个例子为参考,接下来解释Driver program、Job和Stage这几个概念。·Driver program是全部的代码,运行所有的8个步骤。·第⑤步中的save和第⑧步中的collect都会产生Spark Job。Spark中每个action对应着一个Job,注意Transformation不会产生Job。·其他几步(①、②、③、④、⑥、⑦)被Spark组织成多个Stages,每个Job则是一些Stage序列的结果。对于一些简单的场景,一个Job可以只有一个Stage。但是对于数据重分区的需求(比如第③步中的join),或者任何破坏数据局域性的事件,通常会产生更多的Stage。可以将Stage看作能够产生中间结果的计算,这种计算可以被持久化,比如可以把RDD1持久化来避免重复计算。

以上3个概念解释了某个程序运行时被拆分的逻辑。相比之下,Task是一个特定的数据片段,在指定的Executor上运行,并且可以跨越某个特定的Stage。2.2.1 本地模式

该模式又可以称为单机模式,是Spark为开发人员提供的单机测试环境,利用单机的多个线程来模拟Spark分布式计算,用于对程序进行调试,验证应用逻辑的正确性。

我们可以通过两种方式来启用这种模式运行我们的程序:一种是在向Spark提交应用时,利用--master local[N]参数来设置;另一种方式是直接在程序中用setMaster("local[N]")进行设置。

其中N表示用几个线程来模仿Spark集群节点,从而模仿应用程序在集群上的执行,该运行模式非常简单,我们不需要启动任何Spark的Master和Worker等守护进程,另外如果不需要使用HDFS,也不需要启动Hadoop的各项服务。

举一个简单的例子,假设有一个Spark小程序用来统计input.txt文件中每个单词出现的数量,主类是com.spark.hello.HelloSpark,有两种方式来利用local模式运行该程序。

我们可以在向Spark提交的时候,在命令行中直接加入--master local[2]参数,代码如下:$ spark-submit --class com.spark.hello.HelloSparkStreaming --master local [2] target/spark-streaming_hello-0.1-jar-with-dependencies.jar file: //input.txt

或者在初始化SparkContext时,在配置master时设置:val conf = new SparkConf().setAppName("spark-streaming_hello").setMaster ("local[2]")2.2.2 本地集群模式

除了本地模式之外,Spark还提供了一种用于本地测试和调试的模式,就是本地集群模式,该模式会利用当前的单一机器启动多个进程来模拟集群的分布式场景,相比local[N]模式中多个线程分享一个进程的资源,这种模式会更加接近真实的集群环境。通常我们会在部署到集群前,对程序做进一步的测试。

与local[N]模式类似,我们也可以利用运行时参数master local-cluster[x,y,z]或者调用SetMaster("local-cluster[x,y,z]")两种方式来启动集群模式。利用local-cluster[x,y,z]的形式分别对executor的数量x、每个executor的core数量y及内存空间大小z进行设置。2.2.3 Standalone模式

Standalone模式是Spark自带的一种集群模式,不同于前面利用多线程或者多个进程来模拟集群的环境,Standalone模式是真实地在多个机器之间搭建Spark集群的环境,这才体现了分布式的真正价值,实际运用中完全可以利用该模式搭建多机器集群,用于实际的大数据处理。

前面已经介绍了Spark中的基本组件,Standalone模式就是利用Spark自带的Cluster Manager,不需要依赖于其他如Hadoop的服务,除非需要用到HDFS的内容。为了让大家对Spark集群有一个更加直观的感受,按照实际环境中的Spark集群构建,如图2.4所示。图2.4 Standalone集群框架图示例

一般在实际生产环境中,由于Master节点起到了资源分配和任务管理的重要角色,如果Master节点出问题会造成整个集群的瘫痪,所以我们会利用ZooKeeper的特性(ZooKeeper是一个分布式的应用程序协调服务,它能够进行配置维护、分布式同步等,我们会在5.1节对ZooKeeper的内容进行更具体的介绍和部署),对Master节点做一个主备切换的容灾处理。另外,图2.4中还包含了一个Driver节点及两个Worker节点。

在不需要HDFS的应用场景中,Standalone模式可以快捷、轻便地进行集群部署,不过该模式对于每个应用程序资源的分配都是固定的,并不能做到动态分配。本书在Spark的实际操作中,主要是依托于该模式来进行讲解,这种模式也可以适应很多的应用场景,之前笔者在实际项目应用中,针对每天千万量级的词频数据统计也是依托于该模式部署的。2.2.4 Spark On Yarn模式

Spark在0.6.0版本之后,添加了对Yarn模式的支持。通常,当我们已经部署了Hadoop集群时,可以将Spark统一在Yarn模式下进行资源分配管理,有利于资源上的整合与共享。

Spark在Yarn模式上分为Yarn client模式和Yarn cluster模式,两者的主要区别是,在Yarn cluster模式中,应用程序都作为Yarn框架所需要的主应用程序(Application Master),并通过Yarn资源管理器(Yarn ResourceManager)为其分配的一个随机节点上运行。而当我们需要本地交互时,可以利用Yarn client模式,该模型下Spark上下文(Spark-Context)会运行在本地,如Spark Shell和Shark等。因为公司内部大多数都会部署Hadoop集群,利用HDFS和Hive等进行存储管理,所以公司内部部署的公共Spark集群大多会依托于该模式。2.2.5 Spark On Mesos模式

Mesos是Apache下的开源分布式资源管理框架,同Yarn类似,Spark也提供了利用Mesos进行资源管理的方式,即Spark On Mesos模式。该模式可以细分为细粒度和粗粒度两种运行模式,关于Mesos的部署及安装,这里不做过多阐述,感兴趣的读者可以查阅官方文档。2.3 搭建开发环境

在2.2节中对Spark的几种运行模式做了介绍,本书在进行实战的过程中重点以Spark Standalone模式进行,该模式也可以在生产环境中直接部署,不依赖于其他框架模式。当然,对于需要用到Hadoop的读者,也可以尝试Spark On Yarn的部署模式,对资源进行统一的管理。下面就来一步一步地搭建Spark Standalone运行模式及Scala-Eclipse的开发环境。2.3.1 修改配置

2.1节下载和解压Spark之后,在Spark的安装目录下进入conf目录,可以看到以下几个配置文件:$ ll conf/drwxr-xr-x@ 12 xiaolitao staff 384 8 2 21:29 ./drwxr-xr-x@ 18 xiaolitao staff 576 11 5 2017 ../-rw-r--r--@ 1 xiaolitao staff 996 7 1 2017 docker.properties.template-rw-r--r--@ 1 xiaolitao staff 1105 7 1 2017 fairscheduler.xml.template-rw-r--r--@ 1 xiaolitao staff 2025 7 1 2017 log4j.properties.template-rw-r--r--@ 1 xiaolitao staff 7313 7 1 2017 metrics.properties.template-rw-r--r--@ 1 xiaolitao staff 865 7 1 2017 slaves.template-rw-r--r--@ 1 xiaolitao staff 1292 7 1 2017 spark-defaults.conf. template-rwxr-xr-x@ 1 xiaolitao staff 3699 7 1 2017 spark-env.sh.template*

conf目录中给出了很多模板文件,这里对几个常用的文件进行简单说明(默认目录中给出了后缀带有template的示例文件,在正式使用时我们需要将这个后缀去掉)。·docker.properties:当使用Docker容器时,需要就该文件进行相关修改和配置。·log4j.properties:Spark作为源生于Java和Scala的开源系统,其使用的日志服务也依赖于经典的log4j,所以当需要修改日志的显示级别,以及日志的保存文件等相关内容时,就需要在这里做相应配置。·slaves:该文件用来配置slave节点,一般将每个slave节点的IP配置在该文件中。·spark-defaults.conf:在提交Spark应用时,可以在程序内部指定相关配置,如使用的核的数量、最大占用内存数量等,另外也可以在提交命令中指定。如果两处都没有指定,就会按照该文件进行Spark应用的默认配置。·spark-env.sh:是整个Spark集群环境变量的配置文件,我们需要在该文件中配置Java和Scala的安装路径,如果需要,还要配置Hadoop的安装路径。其他配置选项,读者可以参考官方文档http://spark.apache.org/docs/latest/spark-standalone.html。

下面就来修改配置文件,搭建我们自己的集群,用于本书的所有实例。为了方便操作,我们以单一机器来搭建集群,即概念上的Master和Worker节点进行同机部署,而在实际生产环境中,只需要做相应的扩展,添加到集群中即可。集群的架构如图2.5所示。

我们以单一机器进行同机部署,以本地机器作为Master节点,同时该节点也是我们的Slave节点,并在此节点上启动两个Worker进程,图2.5中所示的两个Worker进程是Spark虚拟分配的两个IP。为了完成整个集群的搭建,我们需要简单配置以log4j.properties、slaves和spark-env.sh三个相关文件。图2.5 Spark集群部署

首先将前面介绍的模板文件的*.template的后缀template去掉,如下:$ ll conf/drwxr-xr-x@ 12 xiaolitao staff 384 8 2 21:29 ./drwxr-xr-x@ 18 xiaolitao staff 576 11 5 2017 ../-rw-r--r--@ 1 xiaolitao staff 996 7 1 2017 docker.properties.template-rw-r--r--@ 1 xiaolitao staff 1105 7 1 2017 fairscheduler.xml.template-rw-r--r--@ 1 xiaolitao staff 2025 11 5 2017 log4j.properties-rw-r--r--@ 1 xiaolitao staff 2025 7 1 2017 log4j.properties.template-rw-r--r--@ 1 xiaolitao staff 7313 7 1 2017 metrics.properties.template-rw-r--r--@ 1 xiaolitao staff 865 11 5 2017 slaves-rw-r--r--@ 1 xiaolitao staff 865 7 1 2017 slaves.template-rw-r--r--@ 1 xiaolitao staff 1292 7 1 2017 spark-defaults.conf. template-rwxr-xr-x@ 1 xiaolitao staff 3997 12 13 2017 spark-env.sh*-rwxr-xr-x@ 1 xiaolitao staff 3699 7 1 2017 spark-env.sh.template*

然后将这三个文件从模板文件中复制出来并进行简单的配置。首先对spark-env.sh也是最重要的文件进行如下配置:JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_40.jdk/Contents/HomeSCALA_HOME=/usr/local/Cellar/scala/2.11.7/SPARK_HOME=/Users/xiaolitao/Tools/spark-2.2.0-bin-hadoop2.7/SPARK_MASTER_HOST=127.0.0.1SPARK_WORKER_CORES=2SPARK_WORKER_MEMORY=7gSPARK_WORKER_INSTANCES=2SPARK_DAEMON_MEMORY=1g

首先需要配置本地Java、Scala及Spark的安装目录,以上分别配置了其相应的安装路径(大家实际配置时,需要根据自己的实际环境来配置)。其次配置Master节点的IP,由于我们使用单一机器进行部署,所以这里指定的IP为127.0.0.1。之后的配置是针对每一个Worker实例,包含2核和7GB内存,我们这里共配置了两个Worker实例(特别说明的是,根据机器的实际情况进行配置,由于本书用于试验的机器是4核16GB内存,因此这里进行了如上配置)。

最后配置守护进程的内存总量,包括Master和Worker等。我们进行了一些关键配置,Spark关于Standalone模式还提供了很多其他的配置,大家可以直接看模板文件或者参考官网(http://spark.apache.org/docs/latest/spark-standalone.html)。

对于slaves文件,由于我们目前的集群只有一台机器,所以这里仅仅配置了localhost,Spark会在slaves列出的所有机器节点中启动Worker进程,用来进行计算。

关于log4j.properties,当我们打开从模板文件复制过来的配置文件时,其中已经进行了很详细的日志输出配置,因此这里基本没有改动,主要将log4j.rootCategory的日志输出级别改为了WARN,否则会产生大量的系统日志信息,当程序发生错误的时候将很难进行定位,具体修改如下:# 设置日志信息在控制台输出log4j.rootCategory=WARN, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.errlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# 将日志级别设置为WARN# 会覆盖默认日志级别的设置# 针对不同的Spark应用可以设置不同的日志级别log4j.logger.org.apache.spark.repl.Main=WARN# 将第三方库的日志级别设置高些log4j.logger.org.spark_project.jetty=WARNlog4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle= ERRORlog4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFOlog4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFOlog4j.logger.org.apache.parquet=ERRORlog4j.logger.parquet=ERROR# 避免SparkSQL支持Hive中不存在的UDFs的各种噪音信息log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATALlog4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR2.3.2 启动集群

前面对Spark集群进行了一些基本的配置,一些在具体环境中用到的配置,会在后面介绍。在启动集群前还需要进行一项关键的设置。

由于Spark在进行集群中每个节点的启动、停止的时候,都用SSH进行登录操作,所以与Hadoop类似,我们也需要进行SSH免密登录的设置,在Mac OS中配置方式如下所述。(1)在终端执行ssh-keygen-t rsa,之后一直按回车键,如果本机已经执行过该命令,可能会提示覆盖源文件,选择yes选项。(2)执行cat~/.ssh/id_rsa.pub>>~/.ssh/authorized_keys,用于授权你的公钥到本地可以无须密码登录。

对于Linux操作系统,可以执行如下命令进行免密设置:ssh-keygen -t rsa -P ""cat ./.ssh/id_rsa.pub >> ./.ssh/authorized_keyschmod 600 ~/.ssh/authorized_keys

值得注意的是,如果集群中不止一个slaves节点,那么需要在每台机器上进行免密登录设置,并将其IP配置到Master节点下的conf和slaves配置文件中,这样Spark才能在配置的每个slave节点启动Worker进程。

完成上述操作,就开始激动人心的一刻——启动Spark集群了。在Spark的安装目录下进入sbin目录,具体如下:$ ll sbin/drwxr-xr-x@ 24 xiaolitao staff 768 11 6 2017 ./drwxr-xr-x@ 18 xiaolitao staff 576 11 5 2017 ../-rwxr-xr-x@ 1 xiaolitao staff 2803 7 1 2017 slaves.sh*-rwxr-xr-x@ 1 xiaolitao staff 1429 7 1 2017 spark-config.sh*-rwxr-xr-x@ 1 xiaolitao staff 5688 7 1 2017 spark-daemon.sh*-rwxr-xr-x@ 1 xiaolitao staff 1262 7 1 2017 spark-daemons.sh*-rwxr-xr-x@ 1 xiaolitao staff 1190 7 1 2017 start-all.sh*-rwxr-xr-x@ 1 xiaolitao staff 1274 7 1 2017 start-history-server.sh*-rwxr-xr-x@ 1 xiaolitao staff 2050 7 1 2017 start-master.sh*-rwxr-xr-x@ 1 xiaolitao staff 1877 7 1 2017 start-mesos-dispatcher.sh*-rwxr-xr-x@ 1 xiaolitao staff 1423 7 1 2017 start-mesos-shuffle- service.sh*-rwxr-xr-x@ 1 xiaolitao staff 1279 7 1 2017 start-shuffle-service.sh*-rwxr-xr-x@ 1 xiaolitao staff 3151 7 1 2017 start-slave.sh*-rwxr-xr-x@ 1 xiaolitao staff 1527 7 1 2017 start-slaves.sh*-rwxr-xr-x@ 1 xiaolitao staff 1857 7 1 2017 start-thriftserver.sh*-rwxrwxrwx@ 1 xiaolitao staff 1478 7 1 2017 stop-all.sh*-rwxr-xr-x@ 1 xiaolitao staff 1056 7 1 2017 stop-history-server.sh*-rwxr-xr-x@ 1 xiaolitao staff 1080 7 1 2017 stop-master.sh*-rwxr-xr-x@ 1 xiaolitao staff 1227 7 1 2017 stop-mesos-dispatcher.sh*-rwxr-xr-x@ 1 xiaolitao staff 1084 7 1 2017 stop-mesos-shuffle- service.sh*-rwxr-xr-x@ 1 xiaolitao staff 1067 7 1 2017 stop-shuffle-service.sh*-rwxr-xr-x@ 1 xiaolitao staff 1557 7 1 2017 stop-slave.sh*-rwxr-xr-x@ 1 xiaolitao staff 1064 7 1 2017 stop-slaves.sh*-rwxr-xr-x@ 1 xiaolitao staff 1066 7 1 2017 stop-thriftserver.sh*

我们会发现Spark提供了大量的shell脚本用于操作Spark集群,其中用得比较多的是以下3个脚本文件。·start-all.sh/stop-all.sh:用于启动集群上的所有节点和停止集群上的所有节点。·start-master.sh/stop-master.sh:用于单独启动Master进程和停止该进程。·start-slave.sh/stop-slave.sh:用于启动指定节点的Worker进程和停止该节点的所有Worker进程。

我们直接通过start-all.sh的脚本来启动配置的所有节点,包括一个Master进程和两个Worker进程,代码如下:$ sbin/start-all.shstarting org.apache.spark.deploy.master.Master, logging to /Users/xiaolitao/ Tools/spark-2.2.0-bin-hadoop2.7//logs/spark-xiaolitao-org.apache.spark. deploy.master.Master-1-LITAOXIAO-MC0.outlocalhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xiaolitao/Tools/spark-2.2.0-bin-hadoop2.7//logs/spark-xiaolitao- org.apache.spark.deploy.worker.Worker-1-LITAOXIAO-MC0.outlocalhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xiaolitao/Tools/spark-2.2.0-bin-hadoop2.7//logs/spark-xiaolitao- org.apache.spark.deploy.worker.Worker-2-LITAOXIAO-MC0.outlocalhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/xiaolitao/Tools/spark-2.2.0-bin-hadoop2.7//logs/spark-xiaolitao- org.apache.spark.deploy.worker.Worker-3-LITAOXIAO-MC0.out

Spark提供了一套完整的Web UI用于监控整个集群的情况,默认的访问端口是8080,我们也可以在之前的spark-env.sh中通过SPARK_MASTER_WEBUI_PORT/SPARK_WORKER_WEBUI_PORT参数进行修改。集群启动成功后,访问该页面如图2.6所示。图2.6 Spark集群网页监控

Spark集群中Master节点的默认端口号是7077,我们可以通过SPARK_MASTER_PORT参数在spark-env.sh中进行配置。从图2.6中

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载