循序渐进学Spark(txt+pdf+epub+mobi电子书下载)


发布时间:2020-07-30 07:50:40

点击下载

作者:杨磊

出版社:机械工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

循序渐进学Spark

循序渐进学Spark试读:

前言

Spark诞生于美国加州大学伯克利分校AMP实验室。随着大数据技术在互联网、金融等领域的突破式进展,Spark在近些年得到更为广泛的应用。这是一个核心贡献者超过一半为华人的大数据平台开源项目,且正处于飞速发展、快速成熟的阶段。

为什么写这本书

Spark已经成为大数据计算、分析领域新的热点和发展方向。相对于Hadoop传统的MapReduce计算模型,Spark提供更为高效的计算框架以及更为丰富的功能,因此在大数据生产应用领域中不断攻城略地,势如破竹。

与企业不断涌现的对大数据技术的需求相比,大数据人才还存在很大缺口,对大数据技术充满期许的新人正在源源不断地加入这个领域。在小象学院的教学实践过程中,我们发现,一本能完整系统地介绍Spark各模块原理并兼顾使用实战的书,对于初入大数据领域的技术人员至关重要。于是,我们根据日常积累的经验,著成本书。

Spark作为一个高速发展的开源项目,最近也发布了全新的Spark 2.0版本。对于Spark 2.0版本的新特性,我们也专门给予描述,以期将最前沿的Spark技术奉献给读者。

本书面向的读者

·Spark初学者

·Spark应用开发人员

·Spark运维人员

·大数据技术爱好者

如何阅读本书

本书共分8章:

第1章介绍了Spark大数据处理框架的基本概念、主要组成部分、基本架构,以及Spark集群环境搭建和Spark开发环境的构建方法。

第2章引入Spark编程中的核心——RDD弹性分布式数据集,以典型的编程范例,讲解基于RDD的算子操作。

第3章主要讲述了Spark的工作机制与原理,剖析了Spark的提交和执行时的具体机制,重点强调了Spark程序的宏观执行过程。此外,更深入地剖析了Spark的存储及IO、通信机制、容错机制和Shuffle机制。

第4章对Spark的代码布局做了宏观介绍,并对Spark的执行主线进行详细剖析,从代码层面详细讲述RDD是如何落地到Worker上执行的。同时,本章从另一个角度分析了Client、Master与Worker之间的交互过程,深入讲述了Spark的两个重要功能点及Spark Shuffle与Spark存储机制。

第5章介绍了YARN的基本原理及基于YARN的Spark程序提交,并结合从程序提交到落地执行的过程,详细介绍了各个阶段的资源管理和调度职能。在本章的后半部分,主要从资源配置的角度对YARN及基于YARN的Spark做了较为详细的介绍。

第6章一一讲解了BDAS中的主要模块。由Spark SQL开始,介绍了Spark SQL及其编程模型和DataFrame。接着深入讲解Spark生态中用于流式计算的模块Spark Streaming。之后,讲解了Spark R的基本概念及操作。最后针对机器学习的流行趋势,重点介绍了Spark MLlib的架构及编程应用,以及机器学习的基本概念和基本算法。

第7章首先详细叙述了Spark调优的几个重要方面,接着给出了工业实践中常见的一些问题,以及解决问题的常用策略,最后启发读者在此基础上进一步思考和探索。

第8章描述了Spark 2.0.0发布之后,Spark Core、Spark SQL、MLlib、Spark Streaming、Spark R等模块API的变化以及新增的功能特性等。对于变化较大的Spark SQL,书中用实际的代码样例更详细地说明和讲解了SparkSession、结构化Streaming等新特性。

对于Spark的初学者或希望从零开始详细了解Spark技术的读者,请从第1章开始通读全书;对于有一定Spark基础的研究者,可从第4章开始阅读;如果只想了解Spark最基本的原理,阅读第1~3章即可。

资源和勘误

本书大量资源来源于小象学院专家团队在大数据项目开发以及Spark教学课程中的经验积累。本书内容的撰写也参考了大量官方文档(http://spark.apache.org/)。

由于Spark技术正在飞速发展,加之笔者水平有限,书中难免存在谬误,也可能存在若干技术细节描述不详尽之处,恳请读者批评指正。欢迎大家关注微信服务号“小象学院”,把您的意见或者建议反馈给我们。

致谢

首先应该感谢Apache Spark的开源贡献者们,Spark是当今大数据领域伟大的开源项目之一,没有这一开源项目,便没有本书。

本书以小象学院git项目方式管理。感谢姜冰钰、陈超、冼茂源等每一位内容贡献者,感谢他们花费大量时间,将自己对Spark的理解加上在实际工作、学习过程中的体会,融汇成丰富的内容。

感谢本书的审阅者樊明璐、杨福川、李艺,他们对本书的内容和结构提供了非常宝贵的意见。第1章Spark架构与集群环境

本章首先介绍Spark大数据处理框架的基本概念,然后介绍Spark生态系统的主要组成部分,包括Spark SQL、Spark Streaming、MLlib和GraphX,接着简要描述了Spark的架构,便于读者认识和把握,最后描述了Spark集群环境搭建及Spark开发环境的构建方法。1.1 Spark概述与架构

随着互联网规模的爆发式增长,不断增加的数据量要求应用程序能够延伸到更大的集群中去计算。与单台机器计算不同,集群计算引发了几个关键问题,如集群计算资源的共享、单点宕机、节点执行缓慢及程序的并行化。针对这几个集群环境的问题,许多大数据处理框架应运而生。比如Google的MapReduce,它提出了简单、通用并具有自动容错功能的批处理计算模型。但是MapReduce对于某些类型的计算并不适合,比如交互式和流式计算。基于这种类型需求的不一致性,大量不同于MapReduce的专门数据处理模型诞生了,如GraphLab、Impala、Storm等。大量数据模型的产生,引发的后果是对于大数据处理而言,针对不同类型的计算,通常需要一系列不同的处理框架才能完成。这些不同的处理框架由于天生的差异又带来了一系列问题:重复计算、使用范围的局限性、资源分配、统一管理,等等。1.1.1 Spark概述

为了解决上述MapReduce及各种处理框架所带来的问题,加州大学伯克利分校推出了Spark统一大数据处理框架。Spark是一种与Hadoop MapReduce类似的开源集群大数据计算分析框架。Spark基于内存计算,整合了内存计算的单元,所以相对于hadoop的集群处理方法,Spark在性能方面更具优势。Spark启用了弹性内存分布式数据集,除了能够提供交互式查询外,还可以优化迭代工作负载。

从另一角度来看,Spark可以看作MapReduce的一种扩展。MapReduce之所以不擅长迭代式、交互式和流式的计算工作,主要因为它缺乏在计算的各个阶段进行有效的资源共享,针对这一点,Spark创造性地引入了RDD(弹性分布式数据集)来解决这个问题。RDD的重要特性之一就是资源共享。

Spark基于内存计算,提高了大数据处理的实时性,同时兼具高容错性和可伸缩性,更重要的是,Spark可以部署在大量廉价的硬件之上,形成集群。

提到Spark的优势就不得不提到大家熟知的Hadoop。事实上,Hadoop主要解决了两件事情:

1)数据的可靠存储。

2)数据的分析处理。

相应地,Hadoop也主要包括两个核心部分:

1)分布式文件系统(Hadoop Distributed File System,HDFS):在集群上提供高可靠的文件存储,通过将文件块保存多个副本的办法解决服务器或硬盘故障的问题。

2)计算框架MapReduce:通过简单的Mapper和Reducer的抽象提供一个编程模型,可以在一个由几十台,甚至上百台机器组成的不可靠集群上并发地、分布式地处理大量的数据集,而把并发、分布式(如机器间通信)和故障恢复等计算细节隐藏起来。

Spark是MapReduce的一种更优的替代方案,可以兼容HDFS等分布式存储层,也可以兼容现有的Hadoop生态系统,同时弥补MapReduce的不足。

与Hadoop MapReduce相比,Spark的优势如下:

·中间结果:基于MapReduce的计算引擎通常将中间结果输出到磁盘上,以达到存储和容错的目的。由于任务管道承接的缘故,一切查询操作都会产生很多串联的Stage,这些Stage输出的中间结果存储于HDFS。而Spark将执行操作抽象为通用的有向无环图(DAG),可以将多个Stage的任务串联或者并行执行,而无须将Stage中间结果输出到HDFS中。

·执行策略:MapReduce在数据Shuffle之前,需要花费大量时间来排序,而Spark不需要对所有情景都进行排序。由于采用了DAG的执行计划,每一次输出的中间结果都可以缓存在内存中。

·任务调度的开销:MapReduce系统是为了处理长达数小时的批量作业而设计的,在某些极端情况下,提交任务的延迟非常高。而Spark采用了事件驱动的类库AKKA来启动任务,通过线程池复用线程来避免线程启动及切换产生的开销。

·更好的容错性:RDD之间维护了血缘关系(lineage),一旦某个RDD失败了,就能通过父RDD自动重建,保证了容错性。

·高速:基于内存的Spark计算速度大约是基于磁盘的Hadoop MapReduce的100倍。

·易用:相同的应用程序代码量一般比Hadoop MapReduce少50%~80%。

·提供了丰富的API:与此同时,Spark支持多语言编程,如Scala、Python及Java,便于开发者在自己熟悉的环境下工作。Spark自带了80多个算子,同时允许在Spark Shell环境下进行交互式计算,开发者可以像书写单机程序一样开发分布式程序,轻松利用Spark搭建大数据内存计算平台,并利用内存计算特性,实时处理海量数据。1.1.2 Spark生态

Spark大数据计算平台包含许多子模块,构成了整个Spark的生态系统,其中Spark为核心。

伯克利将整个Spark的生态系统称为伯克利数据分析栈(BDAS),其结构如图1-1所示。

以下简要介绍BDAS的各个组成部分。

1.Spark Core

Spark Core是整个BDAS的核心组件,是一种大数据分布式处理框架,不仅实现了MapReduce的算子map函数和reduce函数及计算模型,还提供如filter、join、groupByKey等更丰富的算子。Spark将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。其底层采用Scala函数式语言书写而成,并且深度借鉴Scala函数式的编程思想,提供与Scala类似的编程接口。图1-1 伯克利数据分析栈的结构

2.Mesos

Mesos是Apache下的开源分布式资源管理框架,被称为分布式系统的内核,提供了类似YARN的功能,实现了高效的资源任务调度。

3.Spark Streaming

Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。其吞吐量能够超越现有主流流处理框架Storm,并提供丰富的API用于流数据计算。

4.MLlib

MLlib是Spark对常用的机器学习算法的实现库,同时包括相关的测试和数据生成器。MLlib目前支持4种常见的机器学习问题:二元分类、回归、聚类以及协同过滤,还包括一个底层的梯度下降优化基础算法。

5.GraphX

GraphX是Spark中用于图和图并行计算的API,可以认为是GraphLab和Pregel在Spark(Scala)上的重写及优化,与其他分布式图计算框架相比,GraphX最大的贡献是,在Spark上提供一栈式数据解决方案,可以方便、高效地完成图计算的一整套流水作业。

6.Spark SQL

Shark是构建在Spark和Hive基础之上的数据仓库。它提供了能够查询Hive中所存储数据的一套SQL接口,兼容现有的Hive QL语法。熟悉Hive QL或者SQL的用户可以基于Shark进行快速的Ad-Hoc、Reporting等类型的SQL查询。由于其底层计算采用了Spark,性能比Mapreduce的Hive普遍快2倍以上,当数据全部存储在内存时,要快10倍以上。2014年7月1日,Spark社区推出了Spark SQL,重新实现了SQL解析等原来Hive完成的工作,Spark SQL在功能上全覆盖了原有的Shark,且具备更优秀的性能。

7.Alluxio

Alluxio(原名Tachyon)是一个分布式内存文件系统,可以理解为内存中的HDFS。为了提供更高的性能,将数据存储剥离Java Heap。用户可以基于Alluxio实现RDD或者文件的跨应用共享,并提供高容错机制,保证数据的可靠性。

8.BlinkDB

BlinkDB是一个用于在海量数据上进行交互式SQL的近似查询引擎。它允许用户在查询准确性和查询响应时间之间做出权衡,执行相似查询。1.1.3 Spark架构

传统的单机系统,虽然可以多核共享内存、磁盘等资源,但是当计算与存储能力无法满足大规模数据处理的需要时,面对自身CPU与存储无法扩展的先天限制,单机系统就力不从心了。

1.分布式系统的架构

所谓的分布式系统,即为在网络互连的多个计算单元执行任务的软硬件系统,一般包括分布式操作系统、分布式数据库系统、分布式应用程序等。本书介绍的Spark分布式计算框架,可以看作分布式软件系统的组成部分,基于Spark,开发者可以编写分布式计算程序。

直观来看,大规模分布式系统由许多计算单元构成,每个计算单元之间松耦合。同时,每个计算单元都包含自己的CPU、内存、总线及硬盘等私有计算资源。这种分布式结构的最大特点在于不共享资源,与此同时,计算节点可以无限制扩展,计算能力和存储能力也因而得到巨大增长。但是由于分布式架构在资源共享方面的先天缺陷,开发者在书写和优化程序时应引起注意。分布式系统架构如图1-2所示。图1-2 分布式系统架构图

为了减少网络I/O开销,分布式计算的一个核心原则是数据应该尽量做到本地计算。在计算过程中,每个计算单元之间需要传输信息,因此在信息传输较少时,分布式系统可以利用资源无限扩展的优势达到高效率,这也是分布式系统的优势。目前分布式系统在数据挖掘和决策支持等方面有着广泛的应用。

Spark正是基于这种分布式并行架构而产生,也可以利用分布式架构的优势,根据需要,对计算能力和存储能力进行扩展,以应对处理海量数据带来的挑战。同时,Spark的快速及容错等特性,让数据处理分析显得游刃有余。

2.Spark架构

Spark架构采用了分布式计算中的Master-Slave模型。集群中运行Master进程的节点称为Master,同样,集群中含有Worker进程的节点为Slave。Master负责控制整个集群的运行;Worker节点相当于分布式系统中的计算节点,它接收Master节点指令并返回计算进程到Master;Executor负责任务的执行;Client是用户提交应用的客户端;Driver负责协调提交后的分布式应用。具体架构如图1-3所示。图1-3 Spark架构

在Spark应用的执行过程中,Driver和Worker是相互对应的。Driver是应用逻辑执行的起点,负责Task任务的分发和调度;Worker负责管理计算节点并创建Executor来并行处理Task任务。Task执行过程中所需的文件和包由Driver序列化后传输给对应的Worker节点,Executor对相应分区的任务进行处理。

下面介绍Spark架构中的组件。

1)Client:提交应用的客户端。

2)Driver:执行Application中的main函数并创建SparkContext。

3)ClusterManager:在YARN模式中为资源管理器。在Standalone模式中为Master(主节点),控制整个集群。

4)Worker:从节点,负责控制计算节点。启动Executor或Driver,在YARN模式中为NodeManager。

5)Executor:在计算节点上执行任务的组件。

6)SparkContext:应用的上下文,控制应用的生命周期。

7)RDD:弹性分布式数据集,Spark的基本计算单元,一组RDD可形成有向无环图。

8)DAG Scheduler:根据应用构建基于Stage的DAG,并将Stage提交给Task Scheduler。

9)Task Scheduler:将Task分发给Executor执行。

10)SparkEnv:线程级别的上下文,存储运行时重要组件的应用,具体如下:

①SparkConf:存储配置信息。

②BroadcastManager:负责广播变量的控制及元信息的存储。

③BlockManager:负责Block的管理、创建和查找。

④MetricsSystem:监控运行时的性能指标。

⑤MapOutputTracker:负责shuffle元信息的存储。

Spark架构揭示了Spark的具体流程如下:

1)用户在Client提交了应用。

2)Master找到Worker,并启动Driver。

3)Driver向资源管理器(YARN模式)或者Master(Standalone模式)申请资源,并将应用转化为RDD Graph。

4)DAG Scheduler将RDD Graph转化为Stage的有向无环图提交给Task Scheduler。

5)Task Scheduler提交任务给Executor执行。

3.Spark运行逻辑

下面举例说明Spark的运行逻辑,如图1-4所示,在Action算子被触发之后,所有累积的算子会形成一个有向无环图DAG。Spark会根据RDD之间不同的依赖关系形成Stage,每个Stage都包含一系列函数执行流水线。图1-4中A、B、C、D、E、F为不同的RDD,RDD内的方框为RDD的分区。图1-4 Spark执行RDD Graph

图1-4中的运行逻辑如下:

1)数据从HDFS输入Spark。

2)RDD A、RDD C经过flatMap与Map操作后,分别转换为RDD B和RDD D。

3)RDD D经过reduceByKey操作转换为RDD E。

4)RDD B与RDD E进行join操作转换为RDD F。

5)RDD F通过函数saveAsSequenceFile输出保存到HDFS中。1.2 在Linux集群上部署Spark

Spark安装部署比较简单,用户可以登录其官方网站(http://spark.apache.org/downloads.html)下载Spark最新版本或历史版本,也可以查阅Spark相关文档作为参考。本书开始写作时,Spark刚刚发布1.5.0版,因此本章所述的环境搭建均以Spark 1.5.0版为例。

Spark使用了Hadoop的HDFS作为持久化存储层,因此安装Spark时,应先安装与Spark版本相兼容的Hadoop。

本节以阿里云Linux主机为例,描述集群环境及Spark开发环境的搭建过程。

Spark计算框架以Scala语言开发,因此部署Spark首先需要安装Scala及JDK(Spark1.5.0需要JDK1.7.0或更高版本)。另外,Spark计算框架基于持久化层,如Hadoop HDFS,因此本章也会简述Hadoop的安装配置。1.2.1 安装OpenJDK

Spark1.5.0要求OpenJDK1.7.0或更高版本。以本机Linux X86机器为例,OpenJDK的安装步骤如下所示:

1)查询服务器上可用的JDK版本。在终端输入如下命令:yum list "*JDK*"

yum会列出服务器上的JDK版本。

2)安装JDK。在终端输入如下命令:yum install java-1.7.0-openjdk-devel.x86cd /usr/lib/jvmln -s java-1.7.0-openjdk.x86 java-1.7

3)JDK环境配置。

①用编辑器打开/etc/profile文件,加入如下内容:export JAVA_HOME=/usr/lib/jvm/java-1.7export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin

关闭并保存profile文件。

②输入命令source/etc/profile让配置生效。1.2.2 安装Scala

登录Scala官网(http://www.scala-lang.org/download/)下载最新版本:scala-2.11.7.tgz

1)安装。tar zxvf scala-2.11.7.tgz -C /usr/localcd /usr/localln -s scala-2.11.7 scala

2)配置:打开/etc/profile,加入如下语句:export SCALA_HOME=/usr/local/scalaexport PATH=$PATH:$SCALA_HOME/bin1.2.3 配置SSH免密码登录

在分布式系统中,如Hadoop与Spark,通常使用SSH(安全协议,Secure Shell)服务来启动Slave节点上的程序,当节点数量比较大时,频繁地输入密码进行身份认证是一项非常艰难的体验。为了简化这个问题,可以使用”公私钥”认证的方式来达到SSH免密码登录。

首先在Master节点上创建一对公私钥(公钥文件:~/.ssh/id_rsa.pub;私钥文件:~/.ssh/id_rsa),然后把公钥拷贝到Worker节点上(~/.ssh/authorized_keys)。二者交互步骤如下:

1)Master通过SSH连接Worker时,Worker生成一个随机数然后用公钥加密后,发回给Master。

2)Master收到加密数后,用私钥解密,并将解密数回传给Worker。

3)Worker确认解密数正确之后,允许Master连接。

如果配置好SSH免密码登录之后,在以上交互中就无须用户输入密码了。下面介绍安装与配置过程。

1)安装SSH:yum install ssh

2)生成公私钥对:ssh-keygen-t rsa

一直按回车键,不需要输入。执行完成后会在~/.ssh目录下看到已生成id_rsa.pub与id_rsa两个密钥文件。其中id_rsa.pub为公钥。

3)拷贝公钥到Worker机器:scp~/.ssh/id_rsa.pub<用户名>@:~/.ssh

4)在Worker节点上,将公钥文件重命名为authorized_keys:mv id_rsa.pub auth-orized_keys。类似地,在所有Worker节点上都可以配置SSH免密码登录。1.2.4 Hadoop的安装配置

登录Hadoop官网(http://hadoop.apache.org/releases.html)下载Hadoop 2.6.0安装包hadoop-2.6.0.tar.gz。然后解压至本地指定目录。tar zxvf hadoop-2.6.0.tar.gz -C /usr/localln -s hadoop-2.6.0 hadoop

下面讲解Hadoop的配置。

1)打开/etc/profile,末尾加入:export HADOOP_INSTALL=/usr/local/hadoopexport PATH=$PATH:$HADOOP_INSTALL/binexport PATH=$PATH:$HADOOP_INSTALL/sbinexport HADOOP_MAPRED_HOME=$HADOOP_INSTALLexport HADOOP_COMMON_HOME=$HADOOP_INSTALLexport HADOOP_HDFS_HOME=$HADOOP_INSTALLexport YARN_HOME=$HADOOP_INSTALL

执行__source/etc/profile__使其生效,然后进入Hadoop配置目录:/usr/local/hadoop/etc/hadoop,配置Hadoop。

2)配置hadoop_env.sh。export JAVA_HOME=/usr/lib/jvm/java-1.7

3)配置core-site.xml。 fs.defaultFS hdfs://Master:9000 hadoop.tmp.dir file:/root/bigdata/tmp io.file.buffer.size 131702

4)配置yarn-site.xml。 yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.auxservices.mapreduce.shuffle.class org.apache.hadoop.mapred.ShuffleHandler yarn.resourcemanager.address Master:8032 yarn.resourcemanager.scheduler.address Master:8030 yarn.resourcemanager.resource-tracker.address Master:8031 yarn.resourcemanager.admin.address Master:8033 yarn.resourcemanager.webapp.address Master:8088

5)配置mapred-site.xml。 mapreduce.framework.name yarn mapreduce.jobhistory.address Master:10020 mapreduce.jobhistory.webapp.address Master:19888

6)创建namenode和datanode目录,并配置路径。

①创建目录。mkdir -p /hdfs/namenodemkdir -p /hdfs/datanode

②在hdfs-site.xml中配置路径。 dfs.namenode.name.dir file:/hdfs/namenode dfs.datanode.data.dir file:/hdfs/datanode dfs.replication 3 dfs.namenode.secondary.http-address Master:9001 dfs.webhdfs.enabled true

7)配置slaves文件,在其中加入所有从节点主机名,例如:x.x.x.x worker1x.x.x.x worker2……

8)格式化namenode:/usr/local/hadoop/bin/hadoop namenode -format

至此,Hadoop配置过程基本完成。1.2.5 Spark的安装部署

登录Spark官网下载页面(http://spark.apache.org/downloads.html)下载Spark。这里选择最新的Spark 1.5.0版spark-1.5.0-bin-hadoop2.6.tgz(Pre-built for Hadoop2.6 and later)。

然后解压spark安装包至本地指定目录:tar zxvf spark-1.5.0-bin-hadoop2.6.tgz -C /usr/local/ln -s spark-1.5.0-bin-hadoop2.6 spark

下面让我们开始Spark的配置之旅吧。

1)打开/etc/profile,末尾加入:export SPARK_HOME=/usr/local/sparkPATH=$PATH:${SPARK_HOME}/bin

关闭并保存profile,然后命令行执行source/etc/profile使配置生效。

2)打开/etc/hosts,加入集群中Master及各个Worker节点的ip与hostname配对。x.x.x.x Master-namex.x.x.x worker1x.x.x.x worker2x.x.x.x worker3……

3)进入/usr/local/spark/conf,在命令行执行:cp spark-env.sh.template spark-env.shvi spark-env.sh

末尾加入:export JAVA_HOME=/usr/lib/jvm/java-1.7export SCALA_HOME=/usr/local/scalaexport SPARK_MASTER_IP=112.74.197.158<以本机为例>export SPARK_WORKER_MEMORY=1g

保存并退出,执行命令:cp slaves.template slavesvi slaves

在其中加入各个Worker节点的hostname。这里以四台机器(master、worker1、worker2、worker3)为例,那么slaves文件内容如下:worker1worker2worker31.2.6 Hadoop与Spark的集群复制

前面完成了Master主机上Hadoop与Spark的搭建,现在我们将该环境及部分配置文件从Master分发到各个Worker节点上(以笔者环境为例)。在集群环境中,由一台主机向多台主机间的文件传输一般使用pssh工具来完成。为此,在Master上建立一个文件workerlist.txt,其中保存了所有Worker节点的IP,每次文件的分发只需要一行命令即可完成。

1)复制JDK环境:pssh -h workerlist -r /usr/lib/jvm/java-1.7 /

2)复制scala环境:pssh -h workerlist -r /usr/local/scala /

3)复制Hadoop:pssh -h workerlist -r /usr/local/hadoop /

4)复制Spark环境:pssh -h workerlist -r /usr/local/spark /

5)复制系统配置文件:pssh -h workerlist /etc/hosts /pssh -h workerlist /etc/profile /

至此,Spark Linux集群环境搭建完毕。1.3 Spark集群试运行

下面试运行Spark。

1)在Master主机上,分别启动Hadoop与Spark。cd /usr/local/hadoop/sbin/./start-all.shcd /usr/local/spark/sbin./start-all.sh

2)检查Master与Worker进程是否在各自节点上启动。在Master主机上,执行命令jps,如图1-5所示。图1-5 在Master主机上执行jps命令

在Worker节点上,以Worker1为例,执行命令jps,如图1-6所示。

从图1-6中可以清晰地看到,Master进程与Worker及相关进程在各自节点上成功运行,Hadoop与Spark运行正常。图1-6 在Worker节点上执行jps命令

3)通过Spark Web UI查看集群状态。在浏览器中输入Master的IP与端口,打开Spark Web UI,如图1-7所示。

从图1-7中可以看到,当集群内仅有一个Worker节点时,Spark Web UI显示该节点处于Alive状态,CPU Cores为1,内存为1GB。此页面会列出集群中所有启动后的Worker节点及应用的信息。图1-7 Spark Web UI界面

4)运行样例。Spark自带了一些样例程序可供试运行。在Spark根目录下,example/src/main文件夹中存放着Scala、Java、Python及用R语言编写的样例,用户可以运行其中的某个样例程序。先拷贝到Spark根目录下,然后执行bin/run-example[class][params]即可。例如可以在Master主机命令行执行:./run-example SparkPi 10

然后可以看到该应用的输出,在Spark Web UI上也可以查看应用的状态及其他信息。1.4 Intellij IDEA的安装与配置

Intellij IDE是目前最流行的Spark开发环境。本节主要介绍Intellij开发工具的安装与配置。Intellij不但可以开发Spark应用,还可以作为Spark源代码的阅读器。1.4.1 Intellij的安装

Intellij开发环境依赖JDK、Scala。

1.JDK的安装

Intellij IDE需要安装JDK 1.7或更高版本。Open JDK1.7的安装与配置前文中已讲过,这里不再赘述。

2.Scala的安装

Scala的安装与配置前文已讲过,此处不再赘述。

3.Intellij的安装

登录Intellij官方网站(http://www.jetbrains.com/idea/)下载最新版Intellij linux安装包ideaIC-14.1.5.tar.gz,然后执行如下步骤:

1)解压:tar zxvf ideaIC-14.1.5.tar.gz-C/usr/

2)运行:到解压后的目录执行./idea.sh

3)安装Scala插件:打开“File”→“Settings”→“Plugins”→“Install JetBrain plugin”运行后弹出如图1-8所示的对话框。

单击右侧Install plugin开始安装Scala插件。1.4.2 Intellij的配置

1)在Intellij IDEA中新建Scala项目,命名为“HelloScala”,如图1-9所示。

2)选择菜单“File”→“Project Structure”→“Libraries”,单击“+”号,选择“java”,定位至前面Spark根目录下的lib目录,选中spark-assembly-1.5.0-hadoop2.6.0.jar,单击OK按钮。

3)与上一步相同,单击“+”号,选择“scala”,然后定位至前面已安装的scala目录,scala相关库会被自动引用。图1-8 Scala插件弹出窗口图1-9 在Intellij IDEA中新建Scala项目

4)选择菜单“File”→“Project Structure”→“Platform Settings”→“SDKs”,单击“+”号,选择JDK,定位至JDK安装目录,单击OK按钮。

至此,Intellij IDEA开发环境配置完毕,用户可以用它开发自己的Spark程序了。1.5 Eclipse IDE的安装与配置

现在介绍如何安装Eclipse。与Intellij IDEA类似,Eclipse环境依赖于JDK与Scala的安装。JDK与Scala的安装前文已经详细讲述过了,在此不再赘述。

对最初需要为Ecplise选择版本号完全对应的Scala插件才可以新建Scala项目。不过自从有了Scala IDE工具,问题大大简化了。因为Scala IDE中集成的Eclipse已经替我们完成了前面的工作,用户可以直接登录官网(http://scala-ide.org/download/sdk.html)下载安装。

安装后,进入Scala IDE根目录下的bin目录,执行./eclipse启动IDE。

然后选择“File”→“New”→“Scala Project”打开项目配置页。

输入项目名称,如HelloScala,然后选择已经安装好的JDK版本,单击Finish按钮。接下来就可以进行开发工作了,如图1-10所示。图1-10 已经创建好的HelloScala项目1.6 使用Spark Shell开发运行Spark程序

Spark Shell是一种学习API的简单途径,也是分析数据集交互的有力工具。

虽然本章还没涉及Spark的具体技术细节,但从总体上说,Spark弹性数据集RDD有两种创建方式:

·从文件系统输入(如HDFS)。

·从已存在的RDD转换得到新的RDD。

现在我们从RDD入手,利用Spark Shell简单演示如何书写并运行Spark程序。下面以word count这个经典例子来说明。

1)启动spark shell:cd进SPARK_HOME/bin,执行命令。./spark-shell

2)进入scala命令行,执行如下命令:scala> val file = sc.textFile("hdfs://localhost:50040/hellosparkshell")scala> val count = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_) scala> count.collect()

首先从本机上读取文件hellosparkshell,然后解析该文件,最后统计单词及其数量并输出如下:15/09/29 16:11:46 INFO spark.SparkContext: Job finished: collect at :17, took 1.624248037 sres5: Array[(String, Int)] = Array((hello,12), (spark,12), (shell,12), (this,1), (is,1), (chapter,1), (three,1)1.7 本章小结

本章着重描述了Spark的生态及架构,使读者对Spark的平台体系有初步的了解。进而描述了如何在Linux平台上构建Spark集群,帮助读者构建自己的Spark平台。最后又着重描述了如何搭建Spark开发环境,有助于读者对Spark开发工具进行一定了解,并独立搭建开发环境。第2章Spark编程模型

与Hadoop相比,Spark最初为提升性能而诞生。Spark是Hadoop MapReduce的演化和改进,并兼容了一些数据库的基本思想,可以说,Spark一开始就站在Hadoop与数据库这两个巨人的肩膀上。同时,Spark依靠Scala强大的函数式编程Actor通信模式、闭包、容器、泛型,并借助统一资源调度框架,成为一个简洁、高效、强大的分布式大数据处理框架。

Spark在运算期间,将输入数据与中间计算结果保存在内存中,直接在内存中计算。另外,用户也可以将重复利用的数据缓存在内存中,缩短数据读写时间,以提高下次计算的效率。显而易见,Spark基于内存计算的特性使其擅长于迭代式与交互式任务,但也不难发现,Spark需要大量内存来完成计算任务。集群规模与Spark性能之间呈正比关系,随着集群中机器数量的增长,Spark的性能也呈线性增长。接下来介绍Spark编程模型。2.1 RDD弹性分布式数据集

通常来讲,数据处理有几种常见模型:Iterative Algorithms、Relational Queries、Map-Reduce、Stream Processing。例如,Hadoop MapReduce采用了MapReduce模型,Storm则采用了Stream Processing模型。

与许多其他大数据处理平台不同,Spark建立在统一抽象的RDD之上,而RDD混合了上述这4种模型,使得Spark能以基本一致的方式应对不同的大数据处理场景,包括MapReduce、Streaming、SQL、Machine Learning以及Graph等。这契合了Matei Zaharia提出的原则:“设计一个通用的编程抽象(Unified Programming Abstraction)”,这也正是Spark的魅力所在,因此要理解Spark,先要理解RDD的概念。2.1.1 RDD简介

RDD(Resilient Distributed Datasets,弹性分布式数据集)是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘或内存中,并控制数据的分区。RDD还提供了一组丰富的操作来操作这些数据,诸如map、flatMap、filter等转换操作实现了monad模式,很好地契合了Scala的集合操作。除此之外,RDD还提供诸如join、groupBy、reduceByKey等更为方便的操作,以支持常见的数据运算。

RDD是Spark的核心数据结构,通过RDD的依赖关系形成Spark的调度顺序。所谓Spark应用程序,本质是一组对RDD的操作。

下面介绍RDD的创建方式及操作算子类型。

·RDD的两种创建方式

·从文件系统输入(如HDFS)创建

·从已存在的RDD转换得到新的RDD

·RDD的两种操作算子

·Transformation(变换)

Transformation类型的算子不是立刻执行,而是延迟执行。也就是说从一个RDD变换为另一个RDD的操作需要等到Action操作触发时,才会真正执行。

·Action(行动)

Action类型的算子会触发Spark提交作业,并将数据输出到Spark系统。2.1.2 深入理解RDD

RDD从直观上可以看作一个数组,本质上是逻辑分区记录的集合。在集群中,一个RDD可以包含多个分布在不同节点上的分区,每个分区是一个dataset片段,如图2-1所示。图2-1 RDD分区

在图2-1中,RDD-1含有三个分区(p1、p2和p3),分布存储在两个节点上:node1与node2。RDD-2只有一个分区P4,存储在node3节点上。RDD-3含有两个分区P5和P6,存储在node4节点上。

1.RDD依赖

RDD可以相互依赖,如果RDD的每个分区最多只能被一个Child RDD的一个分区使用,则称之为窄依赖(narrow dependency);若多个Child RDD分区都可以依赖,则称之为宽依赖(wide dependency)。不同的操作依据其特性,可能会产生不同的依赖。例如,map操作会产生窄依赖,join操作则产生宽依赖,如图2-2所示。图2-2 RDD依赖

2.RDD支持容错性

支持容错通常采用两种方式:日志记录或者数据复制。对于以数据为中心的系统而言,这两种方式都非常昂贵,因为它需要跨集群网络拷贝大量数据。

RDD天生是支持容错的。首先,它自身是一个不变的(immutable)数据集,其次,RDD之间通过lineage产生依赖关系(在下章继续探讨这个话题),因此RDD能够记住构建它的操作图,当执行任务的Worker失败时,完全可以通过操作图获得之前执行的操作,重新计算。因此无须采用replication方式支持容错,很好地降低了跨网络的数据传输成本。

3.RDD的高效性

RDD提供了两方面的特性:persistence(持久化)和partitioning(分区),用户可以通过persist与partitionBy函数来控制这两个特性。RDD的分区特性与并行计算能力(RDD定义了parallerize函数),使得Spark可以更好地利用可伸缩的硬件资源。如果将分区与持久化二者结合起来,就能更加高效地处理海量数据。

另外,RDD本质上是一个内存数据集,在访问RDD时,指针只会指向与操作相关的部分。例如,存在一个面向列的数据结构,其中一个实现为Int型数组,另一个实现为Float型数组。如果只需要访问Int字段,RDD的指针可以只访问Int数组,避免扫描整个数据结构。

再者,如前文所述,RDD将操作分为两类:Transformation与Action。无论执行了多少次Transformation操作,RDD都不会真正执行运算,只有当Action操作被执行时,运算才会触发。而在RDD的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。

在实现时,RDD针对Transformation操作,提供了对应的继承自RDD的类型,例如,map操作会返回MappedRDD,flatMap则返回FlatMappedRDD。执行map或flatMap操作时,不过是将当前RDD对象传递给对应的RDD对象而已。2.1.3 RDD特性总结

RDD是Spark的核心,也是整个Spark的架构基础。它的特性可以总结如下:

1)RDD是不变的(immutable)数据结构存储。

2)RDD将数据存储在内存中,从而提供了低延迟性。

3)RDD是支持跨集群的分布式数据结构。

4)RDD可以根据记录的Key对结构分区。

5)RDD提供了粗粒度的操作,并且都支持分区。2.2 Spark程序模型

下面给出一个经典的统计日志中ERROR的例子,以便读者直观理解Spark程序模型。

1)SparkContext中的textFile函数从存储系统(如HDFS)中读取日志文件,生成file变量。scala> var file = sc.textFile("hdfs://...")

2)统计日志文件中,所有含ERROR的行。scala> var errors = file.filer(line=>line.contains("ERROR"))

3)返回包含ERROR的行数。errors.count()

RDD的操作与Scala集合非常类似,这是Spark努力追求的目标:像编写单机程序一样编写分布式应用。但二者的数据和运行模型却有很大不同,如图2-3所示。图2-3 Spark程序模型

在图2-3中,每一次对RDD的操作都造成了RDD的变换。其中RDD的每个逻辑分区Partition都对应Block Manager(物理存储管理器)中的物理数据块Block(保存在内存或硬盘上)。前文已强调,RDD是应用程序中核心的元数据结构,其中保存了逻辑分区与物理数据块之间的映射关系,以及父辈RDD的依赖转换关系。2.3 Spark算子

本节介绍Spark算子的分类及其功能。2.3.1 算子简介

Spark应用程序的本质,无非是把需要处理的数据转换为RDD,然后将RDD通过一系列变换(transformation)和操作(action)得到结果,简单来说,这些变换和操作即为算子。

Spark支持的主要算子如图2-4所示。图2-4 Spark支持的算子

根据所处理的数据类型及处理阶段的不同,算子大致可以分为如下三类:

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载