Spark大数据分析实战(txt+pdf+epub+mobi电子书下载)


发布时间:2020-06-21 19:17:52

点击下载

作者:高彦杰,倪亚宇

出版社:机械工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

Spark大数据分析实战

Spark大数据分析实战试读:

前言

为什么要写这本书

Spark大数据技术还在如火如荼地发展,Spark中国峰会的召开,各地meetup的火爆举行,开源软件Spark也因此水涨船高,很多公司已经将Spark大范围落地并且应用。Spark使用者的需求已经从最初的部署安装、运行实例,到现在越来越需要通过Spark构建丰富的数据分析应用。写一本Spark实用案例类的技术书籍,是一个持续了很久的想法。由于工作较为紧张,最初只是将参与或学习过的Spark相关案例进行总结,但是随着时间的推移,最终还是打算将其中通用的算法、系统架构以及应用场景抽象出来,并进行适当简化,也算是一种总结和分享。

Spark发源于美国加州大学伯克利分校AMPLab的大数据分析平台,它立足于内存计算,从多迭代批量处理出发,兼顾数据仓库、流处理和图计算等多种计算范式,是大数据系统领域的全栈计算平台。Spark当下已成为Apache基金会的顶级开源项目,拥有着庞大的社区支持,生态系统日益完善,技术也逐渐走向成熟。

现在越来越多的同行已经了解Spark,并且开始使用Spark,但是国内缺少一本Spark的实战案例类的书籍,很多Spark初学者和开发人员只能参考网络上零散的博客或文档,学习效率较慢。本书也正是为了解决上述问题而着意编写。

本书希望带给读者一个系统化的视角,秉承大道至简的主导思想,介绍Spark的基本原理,如何在Spark上构建复杂数据分析算法,以及Spark如何与其他开源系统进行结合构建数据分析应用,让读者开启Spark技术应用之旅。

本书特色

Spark作为一款基于内存的分布式计算框架,具有简洁的接口,可以快速构建上层数据分析算法,同时具有很好的兼容性,能够结合其他开源数据分析系统构建数据分析应用或者产品。

为了适合读者阅读和掌握知识结构,本书从Spark基本概念和机制介绍入手,结合笔者实践经验讲解如何在Spark之上构建机器学习算法,并最后结合不同的应用场景构建数据分析应用。

读者对象

本书中一些实操和应用章节,比较适数据分析和开发人员,可以作为工作手边书;机器学习和算法方面的章节,比较适合机器学习和算法工程师,可以分享经验,拓展解决问题的思路。

·Spark初学者

·Spark应用开发人员

·Spark机器学习爱好者

·开源软件爱好者

·其他对大数据技术感兴趣的人员

如何阅读本书

本书分为11章内容。

第1章 从Spark概念出发,介绍Spark的来龙去脉,阐述Spark机制与如何进行Spark编程。

第2章 详细介绍Spark的开发环境配置。

第3章 详细介绍Spark生态系统重要组件Spark SQL、Spark Streaming、GraphX、MLlib的实现机制,为后续使用奠定基础。

第4章 详细介绍如何通过Flume、Kafka、Spark Streaming、HDFS、Flask等开源工具构建实时与离线数据分析流水线。

第5章 从实际出发,详细介绍如何在Azure云平台,通过Node.js、Azure Queue、Azure Table、Spark Streaming、MLlib等组件对用户行为数据进行分析与推荐。

第6章 详细介绍如何通过Twitter API、Spark SQL、Spark Streaming、Cassandra、D3等组件对Twitter进行情感分析与统计分析。

第7章 详细介绍如何通过Scrapy、Kafka、MongoDB、Spark、Spark Streaming、Elastic Search等组件对新闻进行抓取、分析、热点新闻聚类等挖掘工作。

第8章 详细介绍了协同过滤概念和模型,讲解了如何在Spark中实现基于Item-based、User-based和Model-based协同过滤算法的推荐系统。

第9章 详细介绍了社交网络分析的基本概念和经典算法,以及如何利用Spark实现这些经典算法,用于真实网络的分析。

第10章 详细介绍了主题分析模型(LDA),讲解如何在Spark中实现LDA算法,并且对真实的新闻数据进行分析。

第11章 详细介绍了搜索引擎的基本原理,以及其中用到的核心搜索排序相关算法——PageRank和Ranking SVM,并讲解了如何在Spark中实现PageRank和Ranking SVM算法,以及如何对真实的Web数据进行分析。

如果你有一定的经验,能够理解Spark的相关基础知识和使用技巧,那么可以直接阅读第4~11章。然而,如果你是一名初学者,请一定从第1章的基础知识开始学起。

勘误和支持

由于笔者的水平有限,加之编写时间仓促,书中难免会出现一些错误或者不准确的地方,恳请读者批评指正。如果你有更多的宝贵意见,我们会尽量为读者提供最满意的解答。你也可以通过微博@高彦杰gyj,博客:http://blog.csdn.net/gaoyanjie55,或者邮箱gaoyanjie55@163.com联系到高彦杰。你也可以通过邮箱niyayu@foxmail.com联系到倪亚宇。

期待能够得到大家的真挚反馈,在技术之路上互勉共进。

致谢

感谢微软亚洲研究院的Thomas先生和Ying Yan,在每一次迷茫时给予我鼓励与支持。

感谢机械工业出版社华章公司的杨福川和高婧雅,在近半年的时间里始终支持我们的写作,你们的鼓励和帮助引导我顺利完成全部书稿。

特别致谢

谨以此书献给我最亲爱的爱人,家人,同事,以及众多热爱大数据技术的朋友们!高彦杰第1章Spark简介

本章主要介绍Spark框架的概念、生态系统、架构及RDD等,并围绕Spark的BDAS项目及其子项目进行了简要介绍。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,本章只进行简要介绍,后续章节会有详细阐述。1.1 初识Spark

Spark是基于内存计算的大数据并行计算框架,因为它基于内存计算,所以提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。

1.Spark执行的特点

Hadoop中包含计算框架MapReduce和分布式文件系统HDFS。

Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存储层,融入Hadoop的生态系统,并弥补MapReduce的不足。(1)中间结果输出

Spark将执行工作流抽象为通用的有向无环图执行计划(DAG),可以将多Stage的任务串联或者并行执行,而无需将Stage的中间结果输出到HDFS中,类似的引擎包括Flink、Dryad、Tez。(2)数据格式和内存布局

Spark抽象出分布式内存存储结构弹性分布式数据集RDD,可以理解为利用分布式的数组来进行数据的存储。RDD能支持粗粒度写操作,但对于读取操作,它可以精确到每条记录。Spark的特性是能够控制数据在不同节点上的分区,用户可以自定义分区策略。(3)执行策略

Spark执行过程中不同Stage之间需要进行Shuffle。Shuffle是连接有依赖的Stage的桥梁,上游Stage输出到下游Stage中必须经过Shuffle这个环节,通过Shuffle将相同的分组数据拆分后聚合到同一个节点再处理。Spark Shuffle支持基于Hash或基于排序的分布式聚合机制。(4)任务调度的开销

Spark采用了事件驱动的类库AKKA来启动任务,通过线程池的复用线程来避免系统启动和切换开销。

2.Spark的优势

Spark的一站式解决方案有很多的优势,分别如下所述。(1)打造全栈多计算范式的高效数据流水线

支持复杂查询与数据分析任务。在简单的“Map”及“Reduce”操作之外,Spark还支持SQL查询、流式计算、机器学习和图算法。同时,用户可以在同一个工作流中无缝搭配这些计算范式。(2)轻量级快速处理

Spark代码量较小,这得益于Scala语言的简洁和丰富表达力,以及Spark通过External DataSource API充分利用和集成Hadoop等其他第三方组件的能力。同时Spark基于内存计算,可通过中间结果缓存在内存来减少磁盘I/O以达到性能的提升。(3)易于使用,支持多语言

Spark支持通过Scala、Java和Python编写程序,这允许开发者在自己熟悉的语言环境下进行工作。它自带了80多个算子,同时允许在Shell中进行交互式计算。用户可以利用Spark像书写单机程序一样书写分布式程序,轻松利用Spark搭建大数据内存计算平台并充分利用内存计算,实现海量数据的实时处理。(4)与External Data Source多数据源支持

Spark可以独立运行,除了可以运行在当下的Yarn集群管理之外,它还可以读取已有的任何Hadoop数据。它可以运行多种数据源,比如Parquet、Hive、HBase、HDFS等。这个特性让用户可以轻易迁移已有的持久化层数据。(5)社区活跃度高

Spark起源于2009年,当下已有超过600多位工程师贡献过代码。开源系统的发展不应只看一时之快,更重要的是一个活跃的社区和强大的生态系统的支持。

同时也应该看到Spark并不是完美的,RDD模型适合的是粗粒度的全局数据并行计算;不适合细粒度的、需要异步更新的计算。对于一些计算需求,如果要针对特定工作负载达到最优性能,还需要使用一些其他的大数据系统。例如,图计算领域的GraphLab在特定计算负载性能上优于GraphX,流计算中的Storm在实时性要求很高的场合要更胜Spark Streaming一筹。1.2 Spark生态系统BDAS

目前,Spark已经发展成为包含众多子项目的大数据计算平台。BDAS是伯克利大学提出的基于Spark的数据分析栈(BDAS)。其核心框架是Spark,同时涵盖支持结构化数据SQL查询与分析的查询引擎Spark SQL,提供机器学习功能的系统MLBase及底层的分布式机器学习库MLlib,并行图计算框架GraphX,流计算框架Spark Streaming,近似查询引擎BlinkDB,内存分布式文件系统Tachyon,资源管理框架Mesos等子项目。这些子项目在Spark上层提供了更高层、更丰富的计算范式。

图1-1展现了BDAS的主要项目结构图。图1-1 伯克利数据分析栈(BDAS)主要项目结构图

下面对BDAS的各个子项目进行更详细的介绍。(1)Spark

Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不仅实现了MapReduce的算子map函数和reduce函数及计算模型,还提供了更为丰富的算子,例如filter、join、groupByKey等。Spark将分布式数据抽象为RDD(弹性分布式数据集),并实现了应用任务调度、RPC、序列化和压缩,并为运行在其上层的组件提供API。其底层采用Scala这种函数式语言书写而成,并且所提供的API深度借鉴函数式的编程思想,提供与Scala类似的编程接口。

图1-2所示即为Spark的处理流程(主要对象为RDD)。

Spark将数据在分布式环境下分区,然后将作业转化为有向无环图(DAG),并分阶段进行DAG的调度和任务的分布式并行处理。(2)Spark SQL

Spark SQL提供在大数据上的SQL查询功能,类似于Shark在整个生态系统的角色,它们可以统称为SQL on Spark。之前,由于Shark的查询编译和优化器依赖Hive,使得Shark不得不维护一套Hive分支。而Spark SQL使用Catalyst作为查询解析和优化器,并在底层使用Spark作为执行引擎实现SQL的算子。用户可以在Spark上直接书写SQL,相当于为Spark扩充了一套SQL算子,这无疑更加丰富了Spark的算子和功能。同时Spark SQL不断兼容不同的持久化存储(如HDFS、Hive等),为其发展奠定广阔的空间。图1-2 Spark的任务处理流程图(3)Spark Streaming

Spark Streaming通过将流数据按指定时间片累积为RDD,然后将每个RDD进行批处理,进而实现大规模的流数据处理。其吞吐量能够超越现有主流流处理框架Storm,并提供丰富的API用于流数据计算。(4)GraphX

GraphX基于BSP模型,在Spark之上封装类似Pregel的接口,进行大规模同步全局的图计算,尤其是当用户进行多轮迭代的时候,基于Spark内存计算的优势尤为明显。(5)MLlib

MLlib是Spark之上的分布式机器学习算法库,同时包括相关的测试和数据生成器。MLlib支持常见的机器学习问题,例如分类、回归、聚类以及协同过滤,同时也包括一个底层的梯度下降优化基础算法。1.3 Spark架构与运行逻辑

1.Spark的架构

·Driver:运行Application的main()函数并且创建SparkContext。

·Client:用户提交作业的客户端。

·Worker:集群中任何可以运行Application代码的节点,运行一个或多个Executor进程。

·Executor:运行在Worker的Task执行器,Executor启动线程池运行Task,并且负责将数据存在内存或者磁盘上。每个Application都会申请各自的Executor来处理任务。

·SparkContext:整个应用的上下文,控制应用的生命周期。

·RDD:Spark的基本计算单元,一组RDD形成执行的有向无环图RDD Graph。

·DAG Scheduler:根据Job构建基于Stage的DAG工作流,并提交Stage给TaskScheduler。

·TaskScheduler:将Task分发给Executor执行。

·SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。

2.运行逻辑(1)Spark作业提交流程

如图1-3所示,Client提交应用,Master找到一个Worker启动Driver,Driver向Master或者资源管理器申请资源,之后将应用转化为RDD有向无环图,再由DAGScheduler将RDD有向无环图转化为Stage的有向无环图提交给TaskScheduler,由TaskScheduler提交任务给Executor进行执行。任务执行的过程中其他组件再协同工作确保整个应用顺利执行。图1-3 Spark架构(2)Spark作业运行逻辑

如图1-4所示,在Spark应用中,整个执行流程在逻辑上运算之间会形成有向无环图。Action算子触发之后会将所有累积的算子形成一个有向无环图,然后由调度器调度该图上的任务进行运算。Spark的调度方式与MapReduce有所不同。Spark根据RDD之间不同的依赖关系切分形成不同的阶段(Stage),一个阶段包含一系列函数进行流水线执行。图中的A、B、C、D、E、F,分别代表不同的RDD,RDD内的一个方框代表一个数据块。数据从HDFS输入Spark,形成RDD A和RDD C,RDD C上执行map操作,转换为RDD D,RDD B和RDD E进行join操作转换为F,而在B到F的过程中又会进行Shuffle。最后RDD F通过函数saveAsSequenceFile输出保存到HDFS中。图1-4 Spark执行有向无环图1.4 弹性分布式数据集

本节将介绍弹性分布式数据集RDD。Spark是一个分布式计算框架,而RDD是其对分布式内存数据的抽象,可以认为RDD就是Spark分布式算法的数据结构,而RDD之上的操作是Spark分布式算法的核心原语,由数据结构和原语设计上层算法。Spark最终会将算法(RDD上的一连串操作)翻译为DAG形式的工作流进行调度,并进行分布式任务的分发。1.4.1 RDD简介

在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD)。它在集群中的多台机器上进行了数据分区,逻辑上可以认为是一个分布式的数组,而数组中每个记录可以是用户自定义的任意数据结构。RDD是Spark的核心数据结构,通过RDD的依赖关系形成Spark的调度顺序,通过对RDD的操作形成整个Spark程序。(1)RDD创建方式

1)从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、HBase)输入(例如HDFS)创建。

2)从父RDD转换得到新RDD。

3)通过parallelize或makeRDD将单机数据创建为分布式RDD。(2)RDD的两种操作算子

对于RDD可以有两种操作算子:转换(Transformation)与行动(Action)。

1)转换(Transformation):Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到有Action操作的时候才会真正触发运算。

2)行动(Action):Action算子会触发Spark提交作业(Job),并将数据输出Spark系统。(3)RDD的重要内部属性

通过RDD的内部属性,用户可以获取相应的元数据信息。通过这些信息可以支持更复杂的算法或优化。

1)分区列表:通过分区列表可以找到一个RDD中包含的所有分区及其所在地址。

2)计算每个分片的函数:通过函数可以对每个数据块进行RDD需要进行的用户自定义函数运算。

3)对父RDD的依赖列表:为了能够回溯到父RDD,为容错等提供支持。

4)对key-value pair数据类型RDD的分区器,控制分区策略和分区数。通过分区函数可以确定数据记录在各个分区和节点上的分配,减少分布不平衡。

5)每个数据分区的地址列表(如HDFS上的数据块的地址)。

如果数据有副本,则通过地址列表可以获知单个数据块的所有副本地址,为负载均衡和容错提供支持。(4)Spark计算工作流

图1-5中描述了Spark的输入、运行转换、输出。在运行转换中通过算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。

·输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数据空间,会转化为Spark中的数据块,通过BlockManager进行管理。

·运行:在Spark数据输入形成RDD后,便可以通过变换算子fliter等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。

·输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中(collect输出到Scala集合,count返回Scala Int型数据)。图1-5 Spark算子和数据空间

Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、ShuffledRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。1.4.2 RDD算子分类

本节将主要介绍Spark算子的作用,以及算子的分类。

Spark算子大致可以分为以下两类。

1)Transformation变换算子:这种变换并不触发提交作业,完成作业中间过程处理。

2)Action行动算子:这类算子会触发SparkContext提交Job作业。

下面分别对两类算子进行详细介绍。

1.Transformations算子

下文将介绍常用和较为重要的Transformation算子。(1)map

将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源码中map算子相当于初始化一个RDD,新RDD叫做MappedRDD(this,sc.clean(f))。

图1-7中每个方框表示一个RDD分区,左侧的分区经过用户自定义函数f:T->U映射为右侧的新RDD分区。但是,实际只有等到Action算子触发后这个f函数才会和其他函数在一个stage中对数据进行运算。在图1-6中的第一个分区,数据记录V1输入f,通过f转换输出为转换后的分区中的数据记录V'1。(2)flatMap

将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合,内部创建FlatMappedRDD(this,sc.clean(f))。图1-6 map算子对RDD转换

图1-7表示RDD的一个分区进行flatMap函数操作,flatMap中传入的函数为f:T->U,T和U可以是任意的数据类型。将分区中的数据通过用户自定义函数f转换为新的数据。外部大方框可以认为是一个RDD分区,小方框代表一个集合。V1、V2、V3在一个集合作为RDD的一个数据项,可能存储为数组或其他容器,转换为V'1、V'2、V'3后,将原来的数组或容器结合拆散,拆散的数据形成为RDD中的数据项。图1-7 flapMap算子对RDD转换(3)mapPartitions

mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。内部实现是生成MapPartitionsRDD。图1-8中的方框代表一个RDD分区。

图1-8中,用户通过函数f(iter)=>iter.filter(_>=3)对分区中所有数据进行过滤,大于和等于3的数据保留。一个方块代表一个RDD分区,含有1、2、3的分区过滤只剩下元素3。图1-8 mapPartitions算子对RDD转换(4)union

使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。并不进行去重操作,保存所有元素,如果想去重可以使用distinct()。同时Spark还提供更为简洁的使用union的API,通过++符号相当于union函数操作。

图1-9中左侧大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。合并后,V1、V2、V3……V8形成一个分区,其他元素同理进行合并。(5)cartesian

对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返回CartesianRDD。图1-10中左侧大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。

例如:V1和另一个RDD中的W1、W2、Q5进行笛卡尔积运算形成(V1,W1)、(V1,W2)、(V1,Q5)。图1-9 union算子对RDD转换图1-10 cartesian算子对RDD转换(6)groupBy

groupBy:将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。

函数实现如下:

1)将用户函数预处理:val cleanF = sc.clean(f)

2)对数据map进行函数操作,最后再进行groupByKey分组操作。this.map(t =>(cleanF(t), t)).groupByKey(p)

其中,p确定了分区个数和分区函数,也就决定了并行化的程度。

图1-11中方框代表一个RDD分区,相同key的元素合并到一个组。例如V1和V2合并为V,Value为V1,V2。形成V,Seq(V1,V2)。图1-11 groupBy算子对RDD转换(7)filter

filter函数功能是对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回值为false的元素将被过滤掉。内部实现相当于生成FilteredRDD(this,sc.clean(f))。

下面代码为函数的本质实现:deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

图1-12中每个方框代表一个RDD分区,T可以是任意的类型。通过用户自定义的过滤函数f,对每个数据项操作,将满足条件、返回结果为true的数据项保留。例如,过滤掉V2和V3保留了V1,为区分命名为V'1。(8)sample

sample将RDD这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。

内部实现是生成SampledRDD(withReplacement,fraction,seed)。

函数参数设置:

·withReplacement=true,表示有放回的抽样。

·withReplacement=false,表示无放回的抽样。

图1-13中的每个方框是一个RDD分区。通过sample函数,采样50%的数据。V1、V2、U1、U2……U4采样出数据V1和U1、U2形成新的RDD。图1-12 filter算子对RDD转换图1-13 sample算子对RDD转换(9)cache

cache将RDD元素从磁盘缓存到内存。相当于persist(MEMORY_ONLY)函数的功能。图1-14 Cache算子对RDD转换

图1-14中每个方框代表一个RDD分区,左侧相当于数据分区都存储在磁盘,通过cache算子将数据缓存在内存。(10)persist

persist函数对RDD进行缓存操作。数据缓存在哪里依据StorageLevel这个枚举类型进行确定。有以下几种类型的组合(见图1-14),DISK代表磁盘,MEMORY代表内存,SER代表数据是否进行序列化存储。

下面为函数定义,StorageLevel是枚举类型,代表存储模式,用户可以通过图1-14按需进行选择。persist(newLevel:StorageLevel)

图1-15中列出persist函数可以进行缓存的模式。例如,MEMORY_AND_DISK_SER代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他同理。图1-15 persist算子对RDD转换

图1-16中方框代表RDD分区。disk代表存储在磁盘,mem代表存储在内存。数据最初全部存储在磁盘,通过persist(MEMORY_AND_DISK)将数据缓存到内存,但是有的分区无法容纳在内存,将含有V1、V2、V3的分区存储到磁盘。(11)mapValues

mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。

图1-17中的方框代表RDD分区。a=>a+2代表对(V1,1)这样的Key Value数据对,数据只对Value中的1进行加2操作,返回结果为3。图1-16 Persist算子对RDD转换图1-17 mapValues算子RDD对转换(12)combineByKey

下面代码为combineByKey函数的定义:combineByKey[C](createCombiner:(V)C,mergeValue:(C, V)C,mergeCombiners:(C, C)C,partitioner:Partitioner,mapSideCombine:Boolean=true,serializer:Serializer=null):RDD[(K,C)]

说明:

·createCombiner:V=>C,C不存在的情况下,比如通过V创建seq C。

·mergeValue:(C,V)=>C,当C已经存在的情况下,需要merge,比如把item V加到seq C中,或者叠加。

·mergeCombiners:(C,C)=>C,合并两个C。

·partitioner:Partitioner,Shuffle时需要的Partitioner。

·mapSideCombine:Boolean=true,为了减小传输量,很多combine可以在map端先做,比如叠加,可以先在一个partition中把所有相同的key的value叠加,再shuffle。

·serializerClass:String=null,传输需要序列化,用户可以自定义序列化类:

例如,相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。

图1-18中的方框代表RDD分区。如图,通过combineByKey,将(V1,2),(V1,1)数据合并为(V1,Seq(2,1))。(13)reduceByKey

reduceByKey是比combineByKey更简单的一种情况,只是两个值合并成一个值,(Int,Int V)to(Int,Int C),比如叠加。所以createCombiner reduceBykey很简单,就是直接返回v,而mergeValue和mergeCombiners逻辑是相同的,没有区别。图1-18 comBineByKey算子对RDD转换

函数实现:def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner)}

图1-19中的方框代表RDD分区。通过用户自定义函数(A,B)=>(A+B)函数,将相同key的数据(V1,2)和(V1,1)的value相加运算,结果为(V1,3)。图1-19 reduceByKey算子对RDD转换(14)join

join对两个需要连接的RDD进行cogroup函数操作,将相同key的数据能够放到一个分区,在cogroup操作之后形成的新RDD对每个key下的元素进行笛卡尔积的操作,返回的结果再展平,对应key下的所有元组形成一个集合。最后返回RDD[(K,(V,W))]。

下面代码为join的函数实现,本质是通过cogroup算子先进行协同划分,再通过flatMapValues将合并的数据打散。this.cogroup(other,partitioner).f?latMapValues{case(vs,ws)=> for(v<-vs;w>-ws)yield(v,w) }

图1-20是对两个RDD的join操作示意图。大方框代表RDD,小方框代表RDD中的分区。函数对相同key的元素,如V1为key做连接后结果为(V1,(1,1))和(V1,(1,2))。

2.Actions算子

本质上在Action算子中通过SparkContext进行了提交作业的runJob操作,触发了RDD DAG的执行。图1-20 join算子对RDD转换

例如,Action算子collect函数的代码如下,感兴趣的读者可以顺着这个入口进行源码剖析: /** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = {/*提交Job*/ val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }

下面将介绍常用和较为重要的Action算子。(1)foreach

foreach对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。

图1-21表示foreach算子通过用户自定义函数对每个数据项进行操作。本例中自定义函数为println(),控制台打印所有数据项。图1-21 foreach算子对RDD转换(2)saveAsTextFile

函数将数据输出,存储到HDFS的指定目录。

下面为saveAsTextFile函数的内部实现,其内部通过调用saveAsHadoopFile进行实现:this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

将RDD中的每个元素映射转变为(null,x.toString),然后再将其写入HDFS。

图1-22中左侧方框代表RDD分区,右侧方框代表HDFS的Block。通过函数将RDD的每个分区存储为HDFS中的一个Block。(3)collect

collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数组。在这个数组上运用scala的函数式操作。

图1-23中左侧方框代表RDD分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到Driver程序所在的节点,以数组形式存储。图1-22 saveAsHadoopFile算子对RDD转换图1-23 Collect算子对RDD转换(4)count

count返回整个RDD的元素个数。

内部函数实现为:defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum

图1-24中,返回数据的个数为5。一个方块代表一个RDD分区。图1-24 count对RDD算子转换1.5 本章小结

本章首先介绍了Spark分布式计算平台的基本概念、原理以及Spark生态系统BDAS之上的典型组件。Spark为用户提供了系统底层细节透明、编程接口简洁的分布式计算平台。Spark具有内存计算、实时性高、容错性好等突出特点。同时本章介绍了Spark的计算模型,Spark会将应用程序整体翻译为一个有向无环图进行调度和执行。相比MapReduce,Spark提供了更加优化和复杂的执行流。读者还可以深入了解Spark的运行机制与Spark算子,这样能更加直观地了解API的使用。Spark提供了更加丰富的函数式算子,这样就为Spark上层组件的开发奠定了坚实的基础。

相信读者已经想了解如何开发Spark程序,接下来将就Spark的开发环境配置进行阐述。第2章Spark开发与环境配置

用户进行Spark应用程序开发,一般在用户本地进行单机开发调试,之后再将作业提交到集群生产环境中运行。下面将介绍Spark开发环境的配置,如何编译和进行源码阅读环境的配置。

用户可以在官网上下载最新的AS软件包,网址为:http://spark.apache.org/。2.1 Spark应用开发环境配置

Spark的开发可以通过Intellij或者Eclipse IDE进行,在环境配置的开始阶段,还需要安装相应的Scala插件。2.1.1 使用Intellij开发Spark程序

本节介绍如何使用Intellij IDEA构建Spark开发环境和源码阅读环境。由于Intellij对Scala的支持更好,目前Spark开发团队主要使用Intellij作为开发环境。

1.配置开发环境(1)安装JDK

用户可以自行安装JDK8。官网地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html。

下载后,如果在Windows下直接运行安装程序,会自动配置环境变量,安装成功后,在CMD的命令行下输入Java,有Java版本的日志信息提示则证明安装成功。

如果在Linux下安装,下载JDK包解压缩后,还需要配置环境变量。

在/etc/profile文件中,配置环境变量:export JAVA_HOME=/usr/java/jdk1.8export JAVA_BIN=/usr/java/jdk1.8/binexport PATH=$PATH:$JAVA_HOME/binexport CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport JAVA_HOME JAVA_BIN PATH CLASSPATH(2)安装Scala

Spark内核采用Scala进行开发,上层通过封装接口提供Java和Python的API,在进行开发前需要配置好Scala的开发包。

Spark对Scala的版本有约束,用户可以在Spark的官方下载界面看到相应的Scala版本号。下载指定的Scala包,官网地址:http://www.scala-lang.org/download/。(3)安装Intellij IDEA

用户可以下载安装最新版本的Intellij,官网地址:http://www.jetbrains.com/idea/download/。

目前Intellij最新的版本中已经可以支持新建SBT工程,安装Scala插件,可以很好地支持Scala开发。(4)Intellij中安装Scala插件

在Intellij菜单中选择“Configure”,在下拉菜单中选择“Plugins”,再选择“Browse repositories”,输入“Scala”搜索插件(如图2-1所示),在弹出的对话框中单击“install”按钮,重启Intellij。

2.配置Spark应用开发环境

1)用户在Intellij IDEA中创建Scala Project,SparkTest。

2)选择菜单中的“File”→“project structure”→“Libraries”命令,单击“+”,导入“spark-assembly_2.10-1.0.0-incubating-hadoop2.2.0.jar”。

只需导入该jar包,该包可以通过在Spark的源码工程下执行“sbt/sbt assembly”命令生成,这个命令相当于将Spark的所有依赖包和Spark源码打包为一个整体。

在“assembly/target/scala-2.10.4/”目录下生成:spark-assembly-1.0.0-incubating-hadoop2.2.0.jar。

3)如果IDE无法识别Scala库,则需要以同样方式将Scala库的jar包导入。之后就可以开始开发Spark程序。如图2-2所示,本例将Spark默认的示例程序SparkPi复制到文件。图2-1 输入“Scala”搜索插件图2-2 编写程序

3.运行Spark程序(1)本地运行

编写完scala程序后,可以直接在Intellij中,以本地Local模式运行(如图2-3所示),方法如下。图2-3 以local模式运行

在Intellij中的选择“Run”→“Debug Configuration”→“Edit Configurations”命令。在“Program arguments”文本框中输入main函数的输入参数local。然后右键选择需要运行的类,单击“Run”按钮运行。(2)集群上运行Spark应用jar包

如果想把程序打成jar包,通过命令行的形式运行在Spark集群中,并按照以下步骤操作。

1)选择“File”→“Project Structure”,在弹出的对话框中选择“Artifact”→“Jar”→“From Modules with dependencies”命令。

2)在选择“From Modules with dependencies”之后弹出的对话框中,选择Main函数,同时选择输出jar位置,最后单击“OK”按钮。

具体如图2-4~图2-6所示。

在图2-5中选择需要执行的Main函数。

在图2-6界面选择依赖的jar包。图2-4 生成jar包第一步图2-5 生成jar包第二步图2-6 生成jar包第三步

在主菜单选择“Build”→“Build Artifact”命令,编译生成jar包。

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载