企业大数据处理:Spark、Druid、Flume与Kafka应用实践(txt+pdf+epub+mobi电子书下载)


发布时间:2020-05-19 12:13:13

点击下载

作者:肖冠宇

出版社:机械工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

企业大数据处理:Spark、Druid、Flume与Kafka应用实践

企业大数据处理:Spark、Druid、Flume与Kafka应用实践试读:

前言

我写本书的初衷是将自己在企业工作中应用的技术归纳总结,系统地将大数据处理相关技术融合在一起,给已经从事大数据相关技术研发工作的朋友,或是准备从其他行业转行进入大数据领域学习相关技术的朋友提供一份参考资料。希望本书能够帮助更多从事大数据相关工作的人,也希望通过本书结识更多热爱大数据的朋友。

目前,大数据已不只停留在概念阶段,而是在各领域成功落地,并取得了丰硕的成果。大数据已经渗透到生活中的各个方面,距离我们最近且与我们生活息息相关的大数据项目有交通大数据、医疗大数据、金融大数据、社交媒体大数据、互联网大数据等。如此多的大数据项目能够成功落地,关键原因在于数据来源的多样化,数据量的爆发式增长,新兴技术的快速发展,以及市场创新需求的不断增多,这为各种大数据项目提供了庞大的数据源,通过多种技术的综合应用,可不断挖掘出大数据背后的社会价值和商业价值。

随着开源社区的不断发展,越来越多的优秀项目被开源,以处理各种大数据场景下的问题和挑战。作为目前大数据生态系统内的早期开源项目,Hadoop在廉价机器上实现了分布式数据存储和高性能分布式计算,大大降低了数据存储和计算成本。Hadoop提供的分布式存储系统HDFS、大数据集并行计算编程模型MapReduce、资源调度框架YARN已经被广泛应用,为大数据生态系统的发展奠定了坚实的基础。如今,Hadoop大数据生态圈发展已经非常全面,涉及领域众多,在大数据处理系统中常用的技术框架包括数据采集、数据存储、数据分析、数据挖掘、批处理、实时流计算、数据可视化、监控预警、信息安全等。下图展示了大数据生态系统内比较流行并且已经在生产环境验证过的开源技术。(1)Spark

Spark是由加州大学伯克利分校AMP实验室开源的分布式大规模数据处理通用引擎,具有高吞吐、低延时、通用易扩展、高容错等特点。Spark内部提供了丰富的开发库,集成了数据分析引擎Spark SQL、图计算框架GraphX、机器学习库MLlib、流计算引擎Spark Streaming。Spark在函数式编程语言Scala中实现,提供了丰富的开发API,支持Scala、Java、Python、R等多种开发语言。同时,它提供了多种运行模式,既可以采用独立部署的方式运行,也可以依托Hadoop YARN、Apache Mesos等资源管理器调度任务运行。目前,Spark已经在金融、交通、医疗、气象等多种领域中广泛使用。大数据生态系统中的开源技术(2)Druid

Druid是由美国MetaMarkets公司创建并开源的分布式提供海量时序数据存储、支持实时多维数据分析的OLAP系统,主要应用于广告数据分析、网络系统监控等场景。Druid具有高吞吐、易扩展、高容错、低延迟、按时间序列存储等特点。(3)Flume

Flume是由Cloudera公司开发的分布式、高可用的日志收集系统,是Hadoop生态圈内的关键组件之一,目前已开源给Apache。Flume的原始版本为Flume-OG,经过对整体架构的重新设计,现已改名为Flume-NG。Flume发展到现在已经不局限于日志收集,还可以通过简单的配置收集不同数据源的海量数据并将数据准确高效地传输到不同的中心存储。目前Flume可对接的主流大数据框架有Hadoop、Kafka、ElasticSearch、Hive、HBase等。在使用Flume的过程中,通过配置文件就可以实现整个数据收集过程的负载均衡和故障转移,而不需要修改Flume的任何代码。得益于优秀的框架设计,Flume通过可扩展、插件化、组合式、高可用、高容错的设计模式,为用户提供了简单、高效、准确的轻量化大数据采集工具。(4)Kafka

Kafka是由LinkedIn开源的分布式消息队列,能够轻松实现高吞吐、可扩展、高可用,并且部署简单快速、开发接口丰富。目前,各大互联网公司已经在生产环境中广泛使用,而且已经有很多分布式处理系统支持使用Kafka,比如Spark、Strom、Druid、Flume等。(5)InfluxDB

InfluxDB是一款开源分布式时序数据库,非常适合存储监控系统收集的指标数据。时序数据库顾名思义就是按照时间顺序存储指标数据,即监控系统的场景大部分是按照时间顺序存储各项指标数据,过期时间太长的指标可能将不会再关注,所以为了提高数据库的存储率,提高查询性能,需要定期删除过期指标。InfluxDB的诸多特性非常适合监控系统的使用场景。

本书将详细介绍上述技术的原理,通过实践演示每种技术的实际应用场景。希望通过理论与实践相结合的方式使内容更通俗易懂,帮助读者根据实际的业务场景选择合适的技术方案,相信大数据在未来的发展中还会创造更多的价值。内容概述

本书分三部分展开介绍:

第一部分(第1章)主要介绍了企业大数据系统的前期准备工作,包括如何构建企业大数据处理系统的软件环境和集群环境。

第二部分(第2~7章)首先介绍了Spark的基本原理,Spark 2.0版本的Spark SQL、Structured Streaming原理和使用方法,以及Spark的多种优化方式;然后,介绍了Druid的基本原理、集群的搭建过程、数据摄入过程,以及在查询过程中如何实现Druid查询API;接着介绍了日志收集系统Flume的基本架构和关键组件,以及分层日志收集架构的设计与实践;最后介绍了分布式消息队列Kafka的基本架构和集群搭建过程,以及使用Java语言实现客户端API的详细过程。

第三部分(第8~9章)主要介绍了企业大数据处理的两个实际应用案例,分别是基于Druid构建多维数据分析平台和基于JMX指标的监控系统。目标读者

本书适合从事大数据及相关工作的工程师阅读,也适合准备进入大数据领域的大数据爱好者学习、参考。读者反馈

本书是在业余时间完成的,由于水平有限,编写时间仓促,书中可能会出现介绍不够详细或者有错误的地方,敬请读者谅解。如果遇到任何问题或者寻求技术交流都可以通过如下联系方式与笔者进行沟通。

大数据爱好者交流QQ群:124154694

个人邮箱:xiaoguanyu_java@163.com致谢

感谢在本书的写作过程中帮助过笔者的朋友、同事、老师,感谢你们一次又一次的帮助和支持!

感谢机械工业出版社杨福川老师,本书从2016年6月份开始筹划,确定了基本的框架,虽然由于笔者个人原因导致写作速度缓慢,但是杨老师一直积极推动本书的出版,并且不断指导笔者写作,感谢杨老师给予的理解、帮助与支持。感谢机械工业出版社编辑李艺老师,李艺老师用严谨的工作态度为本书做了专业的编辑工作,并且耐心指导笔者完成了本书的编写工作。

感谢乐视智能中心大数据部的同事们,感谢他们在工作中帮助笔者分担工作任务;感谢上级领导的耐心指导,使笔者能够顺利地完成工作任务并腾出时间进行写作。在此特别感谢技术总监罗宏宇、技术经理陆松林、刘韦宏、姚会航、张迪等。

感谢家人在工作和生活中对笔者的帮助和照顾。感谢父母,平时因工作原因很少回家看望,但他们一直在背后支持我、鼓励我。感谢妻子为家庭和工作的付出。家人的陪伴与支持是笔者不断学习、努力奋斗的强大后盾!第一部分准备工作第1章基础环境准备1.1 软件环境准备

软件版本选择:

操作系统:CentOS 6.6版本;JDK:1.7版本;Maven:3.2版本;Scala:2.10版本。

所有软件安装目录:/data/soft。

确定了软件版本后,我们将具体介绍软件的安装,本节主要介绍基础的软件安装方式。1.JDK安装

JDK是Java Development Kit的简称,为Java语言开发的程序提供开发工具包和运行环境。JDK安装的步骤如下:(1)下载JDK二进制安装包

wget http://download.oracle.com/otn-pub/java/jdk/7u15-b03/jdk-7u15-linux-x64.tar.gz(2)解压安装

tar -zxvf jdk-7u15-linux-x64.tar.gz(3)创建软连接

软连接相当于快捷方式,便于后续版本更新升级。

ls -s /data/soft/jdk-7u15-linux-x64 /usr/local/jdk(4)配置环境变量

vim /etc/profileexport JAVA_HOME=/usr/local/jdkexport JRE_HOME=$JAVA_HOME/jreexportCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar :$JRE_HOME/lib:$CLASSPATHexport PATH=$PATH: $JAVA_HOME/bin

刷新环境变量使其生效:source/etc/profile(5)验证安装是否成功

查看JDK版本命令:java-version2.Maven安装

Maven是Apache开源的一个目前比较流行的项目管理和整合工具,能够自动完成项目的构建,并根据配置文件自动下载依赖组件,提供代码编译、打包、发布等功能。下面介绍Maven的详细安装过程。

Maven安装的步骤如下:(1)下载Maven二进制安装包

wget http://mirror.bit.edu.cn/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz(2)解压安装

tar -zxvf apache-maven-3.3.9-bin.tar.gz(3)创建软连接

软连接相当于快捷方式,便于后续版本更新升级。

ls -s/data/soft/apache-maven-3.3.9-bin /usr/local/maven(4)配置环境变量

vim /etc/profileexport M2_HOME=/usr/local/mavenexport PATH=$PATH: $JAVA_HOME/bin:$M2_HOM/bin

刷新环境变量使其生效:source/etc/profile(5)验证安装是否成功

查看Maven版本命令:mvn-version3.Scala安装

Scala编程语言是一种面向对象的函数式编程语言,充分展现了函数式编程语言简约、高效的特点,在程序开发的过程中可以引入Java语言,可扩展性强。由于Scala具有很多优秀的特性,越来越多的开源项目使用Scala语言开发,比如Spark、Kafka等。下面详细介绍Scala开发环境的安装过程。

Scala安装的步骤如下:(1)下载JDK二进制安装包

wget http://downloads.lightbend.com/scala/2.10.6/scala-2.10.6.tgz(2)解压安装

tar -zxvf scala-2.10.6.tgz(3)创建软连接

软连接相当于快捷方式,便于后续版本更新升级。

ls -s /data/soft/scala-2.10.6 /usr/local/scala(4)配置环境变量

vim /etc/profileexport SCALA_HOME=/usr/local/scalaexport PATH=$PATH: $JAVA_HOME/bin:$M2_HOM/bin:$SCALA_HOME/bin

刷新环境变量使其生效:source/etc/profile(5)验证安装是否成功

查看scala版本命令:scala-version1.2 集群环境准备1.2.1 Zookeeper集群部署

Zookeeper是大数据系统中常用的分布式框架,主要用于公共配置管理、集群资源一致性管理、状态管理、部分分布式系统Leader选举等,下面通过完全分布式搭建方式进行介绍。1.集群规划

由于Zookeeper采用FastLeaderElection算法选举Leader,集群中过半的机器正常运行才能够成功选举Leader,为保证集群正常运行,集群部署的节点数为奇数个,最少节点个数为3,生产环境建议部署5个以上的奇数个节点,因为3个实例其中只要有一个实例不可用,整个Zookeeper集群将无法成功选举,仍然不可以提供服务。2.部署过程

本例将以三个节点的部署为例,分别在192.168.1.1、192.168.1.2、192.168.1.3三台服务器部署一个Zookeeper实例。详细部署过程如下:(1)下载安装包并解压

wget http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

解压到/data/soft目录下:

tar -zxvf http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz-C /data/soft(2)创建软连接

创建软连接便于以后升级版本,方便统一管理。

ls -s /data/soft/zookeeper-3.4.6. /usr/local/zookeeper(3)设置环境变量

vim /etc/profileexport ZOOKEEPER_HOME=/usr/local/zookeeperexport PATH=$PATH: $JAVA_HOME/bin:$M2_HOM/bin:$SCALA_HOME/bin : $ZOOKEEPER_HOME/bin

刷新环境变量使其生效:Source/etc/profile(4)配置

进入到Zookeeper安装目录:cd/usr/local/zookeeper

拷贝一份conf目录下的配置文件,重命名为zoo.cfg:cp./conf/zoo_sample.cfg./conf/zoo.cfg

编辑配置文件设置关键参数:

tickTime=2000initLimit=5syncLimit=3dataDir=/data/zookeeper/datadataLogDir=/usr/local/zookeeper/logsclientPort=2181server.1=192.168.1.1:2888:3888server.2=192.168.1.2:2888:3888server.3=192.168.1.3:2888:3888

关键参数说明:

·tickTime:Zookeeper中的基础参考时间,所有与时间相关的设置都为tickTime时间的整数倍,单位是毫秒。

·initLimit:Zookeeper Leader与Follower初始连接时,Follower需要从Leader同步最新数据,该值表示Follower同步数据的最大超时时间,一般为整数,表示是tickTime的整数倍时间。

·syncLimit:Leader和Follower之间心跳检测的最大超时时间,超过这个时间则认为Follower已经下线。该参数值为整数,表示是tickTime的整数倍时间。

·dataDir:Zookeeper持久化数据目录,建议与安装路径不在同一个路径下。

·dataLogDir:日志文件目录。

·clientPort:监听客户端连接的端口号,默认值为2181。

·server.X=A:B:C。其中X是一个数字,表示这是第几号server;A是该server所在的IP地址;B配置该server和集群中的leader交换消息所使用的端口;C配置选举leader时所使用的端口。(5)创建myid文件

在配置参数dataDir对应的路径下新建myid文件,写入单独的一个数字,表示集群中该实例的编号,该值在集群中是唯一值,不可以重复,数字必须和zoo.cfg配置文件中的server.X中的X一一对应。(6)启动Zookeeper

bin/zkServer.sh start(7)验证安装是否成功

bin/zkServer.sh status(一个leader,两个follower)

或者在Zookeeper安装的任何一个节点执行客户端连接命令:

bin/zkCli.sh -server 192.168.1.1:21811.2.2 Hadoop部署1.Hadoop简介

Apache Hadoop是由著名的Apache基金会开源的分布式存储计算系统,能够在廉价的硬件上轻松实现高可靠、高扩展、高性能、高容错等特性。通过增加机器即可直线增加集群的存储和计算能力。Hadoop在大规模分布式系统中起着重要的作用,目前已经形成一套完整的Hadoop生态系统,并且在不断发展扩大。随着Hadoop生态系统的不断发展,Hadoop已应用到互联网、大数据交通、智能医疗、气象监测、金融服务、人工智能等众多领域。

HDFS(Hadoop Distributed File System,Hadoop分布式文件系统):通过对文件分块多备份分布式存储的方式保证数据具有高效的容错能力,并且有效提高数据的吞吐量。

MapReduce:应用于规模分布式计算的编程模型,该模型包含Map和Reduce两种编程原语。Map阶段常用于接入数据源,数据划分、过滤、整理等操作。Reduce阶段常用于接收Map阶段的数据,聚合计算,持久化结果数据。

YARN:作业调度和集群资源管理框架。目前已经有很多开源项目部署到YARN上运行,将YARN作为统一的作业调度和资源管理框架,如Spark、HBase、Tez等。2.Hadoop集群部署

本节主要介绍Hadoop2.6.4版本的Hadoop集群部署。1.集群规划

为保证集群的高可用能力,NameNode和ResourceManager都采用HA部署方式,各组件详细分布情况如表1-1所示。表1-1 Hadoop集群规划2.部署过程(1)SSH免密码登录

使用root用户登录进入到.ssh目录下

cd ~/.ssh

执行ssh-keygen-t rsa生成公钥和私钥。系统会一直提示信息,一直按回车就可以。生成私钥文件id_rsa,公钥文件id_rsa.pub,认证文件authorized_keys。

将公钥文件内容追加到认证文件中

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

在免密码登录的机器之间互相拷贝公钥然后追加到认证文件中,即可完成SSH免密码登录配置。(2)创建hadoop用户和组

groupadd hadoopuseradd -m -g hadoophadoop(3)下载安装包并解压

先安装hadoop01,然后将配置好的安装包拷贝到其他节点。

wget http://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-2.6.5/hadoop-2.6.5.tar.gz

解压到指定目录/data/soft/下

tar -zxvf hadoop-2.6.5.tar.gz-C /data/soft/(4)创建软连接并修改属主为hadoop

创建软连接便于以后升级版本,方便统一管理。

ln -s /data/soft/ hadoop-2.6.5 /usr/local/hadoopchown -R hadoop:hadoop /usr/local/hadoop(5)设置环境变量

vim /etc/profileexport HADOOP_HOME=/usr/local/hadoopexport PATH=$PATH: $JAVA_HOME/bin:$M2_HOM/bin:$SCALA_HOME/bin : $ZOOKEEPER_HOME/bin:$ HADOOP_HOME/bin

刷新环境变量使其生效

source /etc/profile(6)设置配置文件

a)HDFS相关的配置文件core-site.xml和hdfs-site.xml。

core-site.xml配置信息如下:

fs.defaultFS hdfs://ns1 hadoop.tmp.dir /usr/local/hadoop/tmp ha.zookeeper.quorum hadoop01:2181,hadoop02:2181,hadoop03:2181

hdfs-site.xml配置信息如下:

dfs.nameservices ns1 dfs.ha.namenodes.ns1 nn1,nn2 dfs.namenode.rpc-address.ns1.nn1 hadoop01:9000 dfs.namenode.http-address.ns1.nn1 hadoop01:50070 dfs.namenode.rpc-address.ns1.nn2 hadoop02:9000 dfs.namenode.http-address.ns1.nn2 hadoop02:50070 dfs.namenode.shared.edits.dir qjournal://hadoop01:8485;hadoop02:8485;hadoop03:8485/ns1 dfs.journalnode.edits.dir /usr/local/hadoop/journal dfs.ha.automatic-failover.enabled true dfs.client.failover.proxy.provider.ns1 org.apache.hadoop.hdfs.server.namenode.ha .ConfiguredFailoverProxyProvider dfs.ha.fencing.methods sshfence dfs.ha.fencing.ssh.private-key-files /root/.ssh/id_rsa dfs.datanode.failed.volumes.tolerated 2 dfs.replication 3 dfs.namenode.name.dir /data/hadoop/data1/dfs/name dfs.datanode.data.dir /data/hadoop/data1/dfs/data,/data/hadoop/data2/dfs/data dfs.block.size 536870912

向slaves文件添加datanode/nodemanager节点的hostname:

hadoop01hadoop02hadoop03

b)YARN相关配置文件。

yarn-site.xml配置信息如下:

yarn.resourcemanager.ha.enabled true yarn.resourcemanager.ha.automatic-failover.enabled true yarn.resourcemanager.recovery.enabled true yarn.resourcemanager.ha.rm-ids rm1,rm2 yarn.scheduler.fair.user-as-default-queue true yarn.resourcemanager.store.class org.apache.hadoop.yarn.server.resourcemanager.recovery .ZKRMStateStore yarn.resourcemanager.cluster-id yarn-ha yarn.resourcemanager.hostname.rm1 rm1 yarn.resourcemanager.webapp.address.rm1 ${yarn.resourcemanager.hostname.rm1}:8088 yarn.resourcemanager.hostname.rm2 rm2 yarn.resourcemanager.webapp.address.rm2 ${yarn.resourcemanager.hostname.rm2}:8088 yarn.nodemanager.resource.memory-mb 81920 yarn.nodemanager.resource.cpu-vcores 10 yarn.resourcemanager.zk-address hadoop01:2181,hadoop02:2181,hadoop03:2181 yarn.nodemanager.log-dirs file:///data/hadoop/data1/yarn/log ,file:///data/hadoop/data2/yarn/log yarn.app.mapreduce.am.resource.mb 2048 yarn.scheduler.minimum-allocation-mb 1024 yarn.scheduler.maximum-allocation-mb 8192 yarn.scheduler.minimum-allocation-vcores 1 yarn.scheduler.maximum-allocation-vcores 10 yarn.log-aggregation-enable true yarn.log-aggregation.retain-seconds 259200 yarn.nodemanager.remote-app-log-dir /data/hadoop/yarn-logs yarn.resourcemanager.scheduler.class org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair .FairSchedule yarn.scheduler.fair.allocation.file /usr/local/hadoop/etc/hadoop/fair-scheduler.xml

mapred-site.xml配置信息如下:

mapreduce.jobhistory.address hadoop01:10020 mapreduce.jobhistory.webapp.address hadoop01:19888 mapreduce.jobhistory.joblist.cache.size 200000 mapreduce.framework.name yarn mapreduce.map.memory.mb 1024 mapreduce.reduce.memory.mb 8192 mapreduce.map.java.opts -Xmx1700m -Xms900m mapreduce.reduce.java.opts -Xmx7168m -Xms3000m mapreduce.client.submit.file.replication 20 mapred.fairscheduler.poolnameproperty group.name

fair-scheduler.xml公平调度策略配置信息如下(按组分配不同的内存和vcore资源):

50000 mb,10 vcores 10 1.0 fair 80000 mb, 20 vcores 20 1.0 fair 99

将配置好的hadoop拷贝到其他节点:

scp -r /data/soft/hadoop-2.6.5 hadoo02: /data/soft/scp -r/data/soft/hadoop-2.6.5 hadoo03: /data/soft/(7)集群启动

从root用户切换到hadoop用户:

su - hadoop

启动journalnode(在hadoop01上启动所有journalnode):

cd /usr/local/hadoopsbin/hadoop-daemons.sh start journalnode

jps验证,后台进程增加JournalNode进程。

格式化HDFS:

在hadoop01上执行命令:hadoop namenode-format

格式化后会根据在core-site.xml中的hadoop.tmp.dir配置生成一个文件,拷贝该文件到另外一个NameNode节点hadoop02的/usr/local/hadoop/tmp目录下:

scp -r /usr/local/hadoop/tmp/ hadoop02:/usr/local/hadoop/

格式化ZK(在hadoop01上执行即可):

hdfs zkfc -formatZK

启动HDFS(在hadoop01上执行):

sbin/start-dfs.sh

启动YARN(在hadoop01上执行):

sbin/start-yarn.sh

Hadoop部署完成后在各个节点中使用jps命令查看各组件进程是否运行正常。如果发现有问题则查看日志进行排查。(8)可以通过浏览器访问查看

http://192.168.1.1:50070

页面显示:NameNode'hadoop01:9000'(active)

http://192.168.1.2:50070

页面显示:NameNode'hadoop02:9000'(standby)

http://192.168.1.1:80881.3 小结

本章主要介绍了构建基础环境所需要软件的安装方法,目前使用的大部分开源软件依赖于JVM,使用较为广泛的开发语言为Java.Scala,而源码编译普遍使用Maven工具。Hadoop目前已经可以作为大数据应用系统的基础系统,提供分布式数据存储、集中式资源调度、大规模分布式计算等功能。通过本章的学习初步构建了一套大数据应用系统的基础环境。第二部分核心技术第2章Spark详解2.1 Spark概述

Spark是由加州大学伯克利分校AMP实验室开源的分布式大规模数据处理通用引擎,具有高吞吐、低延时、通用易扩展、高容错等特点。Spark内部提供了丰富的开发库,集成了数据分析引擎Spark SQL、图计算框架GraphX、机器学习库MLlib、流计算引擎Spark Streaming。Spark在函数式编程语言Scala中实现,提供了丰富的开发API,支持Scala、Java、Python、R等多种开发语言。同时,Spark提供了多种运行模式,既可以采用独立部署的方式运行,也可以依托Hadoop YARN、Apache Mesos等资源管理器调度任务运行。目前,Spark已经在金融、交通、医疗、气象等多种领域中广泛使用。2.1.1 Spark概述1.核心概念介绍

Spark架构示意图如图2-1所示,下面将分别介绍各核心组件。

Client:客户端进程,负责提交作业。

Driver:一个Spark作业有一个Spark Context,一个Spark Context对应一个Driver进程,作业的main函数运行在Driver中。Driver主要负责Spark作业的解析,以及通过DAGScheduler划分Stage,将Stage转化成TaskSet提交给TaskScheduler任务调度器,进而调度Task到Executor上执行。

Executor:负责执行Driver分发的Task任务。集群中一个节点可以启动多个Executor,每一个Executor可以执行多个Task任务。

Catche:Spark提供了对RDD不同级别的缓存策略,分别可以缓存到内存、磁盘、外部分布式内存存储系统Tachyon等。

Application:提交的一个作业就是一个Application,一个Application只有一个Spark Context。

Job:RDD执行一次Action操作就会生成一个Job。

Task:Spark运行的基本单位,负责处理RDD的计算逻辑。

Stage:DAGScheduler将Job划分为多个Stage,Stage的划分界限为Shuffle的产生,Shuffle标志着上一个Stage的结束和下一个Stage的开始。

TaskSet:划分的Stage会转换成一组相关联的任务集。

RDD(Resilient Distributed Dataset):弹性分布式数据集,可以理解为一种只读的分布式多分区的数组,Spark计算操作都是基于RDD进行的,下面会有详细介绍。

DAG(Directed Acyclic Graph):有向无环图。Spark实现了DAG的计算模型,DAG计算模型是指将一个计算任务按照计算规则分解为若干子任务,这些子任务之间根据逻辑关系构建成有向无环图。图2-1 Spark架构示意图2.RDD介绍

RDD从字面上理解有些困难,我们可以认为是一种分布式多分区只读的数组,Spark计算操作都是基于RDD进行的。RDD具有几个特性:只读、多分区、分布式,可以将HDFS块文件转换成RDD,也可以由一个或多个RDD转换成新的RDD,失效自动重构。基于这些特性,RDD在分布式环境下能够被高效地并行处理。(1)计算类型

在Spark中RDD提供Transformation和Action两种计算类型。Transformation操作非常丰富,采用延迟执行的方式,在逻辑上定义了RDD的依赖关系和计算逻辑,但并不会真正触发执行动作,只有等到Action操作才会触发真正执行操作。Action操作常用于最终结果的输出。

常用的Transformation操作如表2-1所示。表2-1 常用的Transformation操作及其描述

常用的Action操作如表2-2所示。表2-2 常用的Action操作及其描述

从HDFS文件生成Spark RDD,经过map、filter、join等多次Transformation操作,最终调用saveAsTextFile Action操作将结果集输出到HDFS,并以文件形式保存。RDD的流转过程如图2-2所示。图2-2 RDD的流转过程示意图(2)缓存

在Spark中RDD可以缓存到内存或者磁盘上,提供缓存的主要目的是减少同一数据集被多次使用的网络传输次数,提高Spark的计算性能。Spark提供对RDD的多种缓存级别,可以满足不同场景对RDD的使用需求。RDD的缓存具有容错性,如果有分区丢失,可以通过系统自动重新计算。

在代码中可以使用persist()方法或cache()方法缓存RDD。cache()方法默认将RDD缓存到内存中,cache()方法和persist()方法都可以用unpersist()方法来取消RDD缓存。示例如下:

val fileDataRdd = sc.textFile("hdfs://data/hadoop/test.text")fileDataRdd.cache() // 缓存RDD到内存

或者

fileDataRdd.persist(StorageLevel.MEMORY_ONLY)fileDataRdd..unpersist() // 取消缓存Spark的所有缓存级别定义在

Spark的所有缓存级别定义在org.apache.spark.storage.StorageLevel对象中,如下所示。

object storageLevel extends scala.AnyRef with scala.Serializable { val NONE : org.apache.spark.storage.StorageLevel val DISK_ONLY : org.apache.spark.storage.StorageLevel val DISK_ONLY_2 : org.apache.spark.storage.StorageLevel val MEMORY_ONLY : org.apache.spark.storage.StorageLevel val MEMORY_ONLY_2 : org.apache.spark.storage.StorageLevel val MEMORY_ONLY_SER : org.apache.spark.storage.StorageLevel val MEMORY_ONLY_SER_2 : org.apache.spark.storage.StorageLevel val MEMORY_AND_DISK : org.apache.spark.storage.StorageLevel val MEMORY_AND_DISK_2 : org.apache.spark.storage.StorageLevel val MEMORY_AND_DISK_SER : org.apache.spark.storage.StorageLevel val MEMORY_AND_DISK_SER_2 : org.apache.spark.storage.StorageLevel val OFF_HEAP : org.apache.spark.storage.StorageLevel

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载