SparkSQL内核剖析(txt+pdf+epub+mobi电子书下载)


发布时间:2020-05-14 19:31:39

点击下载

作者:朱锋,张韶全

出版社:电子工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

SparkSQL内核剖析

SparkSQL内核剖析试读:

前言

极其迅速的信息传播将人们带入了大数据时代,也推动了大数据技术的发展。Spark于2009年诞生于伯克利大学AMP实验室,至今已经形成完整的生态圈。除参与度高的开源社区外,各种相关的技术分享和论坛(如每年的Spark Summ it)也是如火如荼。得益于其灵活的RDD计算模型,Spark系统高效地支持了各类应用,涉及SQL处理、图计算和机器学习等。

本书重点讲解Spark SQL,该系统在企业中的应用非常广泛,也是Spark生态圈中较活跃的部分。从另一个视角来看,Spark SQL是近年来SQL-on-Hadoop解决方案(包括Hive、Presto和Impala等)中的佼佼者,结合了数据库SQL处理和Spark分布式计算模型两个方面的技术,目标是取代传统的数据仓库。

在实际生产环境中,因为一些个性化的需求,往往涉及对原生的Spark SQL系统进行定制化的改造或新特性的添加,此过程需要开发人员对内部实现有深入的了解。然而笔者发现,目前业界在这方面的资料还比较缺乏,虽然已经涌现了一系列的文章和书籍,但内容通常都以Spark本身为主,或者停留在API使用和概括性介绍层面,难以满足开发人员的需求。

本书定位于弥补这方面的空白,对Spark的基本概念和功能(如RDD和调度等)不再展开讲解,而是将内容重点放在SQL内核实现的剖析上,旨在同读者一起“入于其中”,从源码实现上学习分布式计算和数据库领域的相关技术。

本书面向的读者

本书主要面向在企业中任职的系统架构师和软件开发人员,以及对大数据、分布式计算和数据库系统实现感兴趣的研究人员。需要注意的是,本书对读者在大数据系统和数据库方面的基础知识(SQL)上有一定的要求。对于初学者来说,最好能够首先参考相关资料,做到有所了解。

本书的主要内容

本书的内容可以分成4个部分:(1)背景和基础知识概述(第1~2章),这两章分别介绍Spark SQL的前生今世和与Spark相关的基础知识,对此熟悉的读者可以直接跳过;(2)Spark SQL功能实现的各个阶段(第3~6章),这4章结合简单例子分别从整体和每个阶段的细节介绍内部机制,涉及SQL编译、逻辑计划和物理计划;(3)专题展开(第7~10章),这4章重点介绍Spark SQL中Aggregation和Join实现,深入分析Tungsten计划中的几项优化技术,以及Spark SQL连接Hive的实现;(4)实践部分(第11章),这一章分享Spark SQL系统在生产环境中的应用和一些改造优化经验。

一些约定和说明

•相关术语:Spark依赖于JVM,主体采用Scala开发语言,部分功能也用到了Java语言来实现。本书没有严格地区分两种语言的术语,例如Scala语言中的trait(特质)和Java语言中的interface(接口)等,书中一般以Java语言的术语为主。此外,本书涉及的源码分析较多,不方便直接翻译的类或接口命名,以其英文命名为主。一些SQL关键词(例如Select、Join等)或Spark术语(例如Shuffl e、Partition、Executor等)在不同上下文环境中也会出现大小写混用的情况。另外,Map和Reduce虽然是来自MapReduce中的概念,但本书在介绍Spark SQL时也使用了这两个概念,分别用来表示Shuffl e前的阶段和Shuffl e后的阶段。

•版本说明:本书使用的Spark版本是Spark 2.X。笔者在写作时,以2.1版本和2.2版本中的实现机制为主。然而,Spark社区活跃,版本的演化非常迅速(平均半年一个版本),读者在理解基本的框架和思路后可以结合JIRA上的相关Issue和对应的Patch进行跟踪。当然,后期最好的方式是参与到社区的贡献中。

•推荐的阅读方式:本书内容涉及的实现细节较多,因此建议的阅读方式是结合代码进行理解。调试环境搭建好之后,在关键步骤插入日志信息,纵向(宏观)和横向(细节)分析交叉进行,最终做到在脑海中将上层的SQL语句映射为底层的RDD模型。

前沿技术的整理和分析并不是一件轻松的工作,从大纲的确定、内容的选择到最终出版得益于多方的大力支持。在此感谢电子工业出版社的各位编辑对本书出版提供的帮助,感谢马朋勃、马骉和邓飞等提出的宝贵修改建议。写作是一个不断学习并进行归纳和整理的过程,笔者在写作中也受到相关技术博客和论文思路的启发,在此一并感谢。

因笔者水平有限,本书的错漏和不足之处欢迎广大读者朋友批评指证。如果有更好的建议,也欢迎通过电子邮件联系几位作者:朱锋(wellfengzhu@gmail.com)、张韶全(shaoquan.zhang@hotmail.com)和黄明(andyyehoo@gmail.com)。第1章Spark SQL背景

技术的诞生往往都有着特定的历史背景,而对技术来龙去脉的了解有助于我们从宏观层面把握全局方向。本章从大数据概念产生以来10多年的技术发展轨迹讲起,简要回顾Spark SQL的演化历程。1.1大数据与Spark系统

大数据一词,最早出现于20世纪90年代,由数据仓库之父Bill Inmon所提及。2008年,Nature杂志出版了大数据专刊“Big Data”,专门讨论海量数据对互联网、经济、环境和生物等各方面的影响与挑战。2011年,Science出版了如何应对数据洪流(Data deluge)的专刊“Dealing w ith Data”,指出如何利用大数据中宝贵的数据价值来推动人类社会的发展。迄今为止,大数据并没有统一的标准定义,业界和学术界通常用5个方面的属性(5V)来描述大数据的特点:Volume(体量大)、Velocity(时效高)、Variety(类型多)、Veracity(真实性)、Value(价值大)。

大数据一方面意味着巨大的信息价值,另一方面也带来了技术上的挑战,使得传统的计算机技术难以在合理的时间内达到数据存储、处理和分析的目的。大数据应用的爆发性增长,已经衍生出独特的架构,并直接推动了存储、网络和计算技术的研究。Google公司于[1]2003年在SOSP会议上发表论文介绍分布式文件系统GFS,于2004年在OSDI会议上发表论文介绍分布式大数据编程模型与处理框架[2]MapReduce,于2006年再次在OSDI会议上发表论文介绍分布式数[3]据库BigTable的实现。以上三者统称为Google公司初期大数据技术的“三驾马车”,自此各种大数据存储与处理技术开始蓬勃发展。[4]

Spark分布式计算框架是大数据处理领域的佼佼者,由美国加[5]州大学伯克利分校的AMP实验室开发。相比于流行的Hadoop系统,Spark优势明显。Spark一方面提供了更加灵活丰富的数据操作方式,有些需要分解成几轮MapReduce作业的操作,可以在Spark里一轮实现;另一方面,每轮的计算结果都可以分布式地存放在内存中,下一轮作业直接从内存中读取上一轮的数据,节省了大量的磁盘IO开销。因此,对于机器学习、模式识别等迭代型计算,Spark在计算速度上通常可以获得几倍到几十倍的提升。得益于Spark对Hadoop计算的兼容,以及对迭代型计算的优异表现,成熟之后的Spark系统得到了广泛的应用。例如,在大部分公司中,典型的场景是将Hadoop(HDFS)作为大数据存储的标准,而将Spark作为计算引擎的核心。

经过多年的发展,Spark已成为目前大数据处理领域炙手可热的顶级开源项目。屈指一算,Spark从诞生到2018年,已经走过了整整9个年头。如图1.1所示,第一个版本的Spark诞生在2009年,代码量仅有3900行左右,其中还包含600行的例子和300多行的测试代码。当时,Hadoop在国外已经开始流行,但是其MapReduce编程模型较为笨拙和烦琐,Matei借鉴Scala的Collection灵感,希望开发出一套能像操作本地集合一样简捷、高效操作远程大数据的框架,并能运行于[6]Mesos平台上,于是就有了Spark最初的0.1版本和0.2版本。后来,Reynold Xin加入这个项目,在协助对Core模块进行开发的同时,在[7]其之上启动了Shark项目,希望能够让Spark更好地处理SQL任务,[8]替代当时流行的Hive(基于Hadoop的数据仓库解决方案)。当然,这个版本的Shark在多个方面都存在先天的不足,Spark在后来的发展过程中将其废弃,另起炉灶,从头来过,这也就是众所周知的Spark [9]SQL,相关细节会在后面进一步详述。图1.1 Spark发展历程

在此期间,Spark经历了一个蓬勃的生长期,从2012年的0.6版本开始,Core模块开始趋于稳定,接近生产级别,不再是实验室的产物。我国的阿里巴巴团队开始将其用于线上正式作业,并取得了比使用MapReduce更好的效果。同时,Intel公司也开始投入力量到该项目的开发中。在0.7版本中,Tathagata Das开始加入Stream ing模块,使得Spark具备准实时的流处理能力。到了0.8版本,Spark正式支持[10]在YARN上的部署运行,Yahoo公司贡献了重要的代码,国内的阿里巴巴团队也开始将其正式部署到内部的云梯集群,搭建了300台专[11]用Spark集群。在0.9版本中,图处理系统GraphX正式成为独立模块,同年,Apache接受Spark成为顶级项目,从孵化期到正式项目,只经历了短短半年时间,这种速度在Apache开源社区是非常难得[12]的。到了1.0版本,孟祥瑞等人加入DataBricks公司,主导的MLLib也成为正式模块。至此,各个主要模块形成了较完整的Spark技术栈(生态系统),如图1.2所示。

可以看出,在2012—2014年,Spark经历了一个高速的发展过程,各个模块快速演进,各大公司和全球顶尖开发人员的加入,使得整个项目充满生命力和活力。DataBricks公司成立后,多数客户的需求集中在常用的数据处理方面,而这需要Spark系统有完善且强大的SQL能力。因此,在2014—2017年,Spark技术栈重点关注Spark SQL子项目,在钨丝计划(Tungsten)的基础上,开始了DataFrame和DataSet为用户接口核心的SQL功能开发,使得Spark SQL项目发展迅速,整体内核也针对SQL做了很多优化。从1.6版本开始,社区的发展一步一个脚印,到如今功能完善的2.x版本,Spark SQL已经非常成熟,完全达到了商用的程度。图1.2 Spark技术栈1.2关系模型与SQL语言

计算机软件系统离不开底层的数据,而数据的存储管理需求促进了数据库技术的发展。早期的数据库包含网状数据库和层次数据库等[13]不同类型。在20世纪70年代,IBM的研究员E.F.Codd提出关系模型。因为具有严格的数学理论基础、较高的抽象级别,而且便于理解和使用,所以关系模型成为现代数据库产品的主流,并占据统治地位40多年。此外,IBM公司研究人员将关系模型中的数学准则以关键字语法表现出来,于1974年里程碑式地提出了SQL(Structured Query [14]Language)语言。SQL语言是一种声明式的语言,提供了查询、操纵、定义和控制等数据库生命周期中的全部操作。另外,建立在关系模型之上并提供SQL语言支持的关系数据库管理系统(如Oracle、DB2、SQLServer等),已经成为企业通用的数据存储解决方案。对数据管理人员和应用开发人员来说,关系模型和SQL语言一直是必不可少的技术基础。

尽管非结构化数据和半结构化数据在数据量上占有绝大部分比例,但结构化数据和SQL需求仍旧具有举足轻重的作用。当Hadoop生态系统进入企业时(注:广义上Spark也可以看作是Hadoop生态系统中的一员),必须面对的一个问题就是怎样解决和应对传统成熟的信息架构。在企业内部,如何处理原有的结构化数据是企业进入大数据领域所面临的难题。Hadoop生态系统的初衷在于解决日志文件分析、互联网点击流、倒排索引和大文件存储等非结构化数据的问题。在此背景的驱动下,面向Hadoop生态系统的SQL查询处理技术及框[15—17]架(统称为“SQL-on-Hadoop”)应运而生。SQL-on-Hadoop解决方案为用户提供关系模型和SQL查询接口,并透明地将存储与查询转换为Hadoop生态系统的对应技术来管理海量结构化数据。

作为数据分析领域重要的支撑,SQL-on-Hadoop成为近几年来广[18]受关注的热点,并涌现出大量的产品,如典型的Hive、Impala和[19]Presto等。所以,从横向来看,Spark SQL算是SQL-on-Hadoop解决方案大家庭中的重要一员。SQL-on-Hadoop并非专指某一个特定的系统,而是借助于Hadoop生态系统完成SQL功能的解决方案的总称。因此,一个具体的SQL-on-Hadoop系统往往依赖于Hadoop生态[20]系统中的分布式存储技术(如HDFS、HBase等),或者利用分布式计算框架(如MapReduce、Tez、Spark等),具有高度的灵活性。[21]例如,Hive既可以转换为MapReduce计算框架,也能够转换为Tez这种DAG的计算方式。此外,对于关系数据表的访问,Spark SQL既支持直接存储在HDFS上,又可以通过连接器(Connector)连接存储在类似HBase这种具有特定数据模型的系统中。而Impala则将SQL语言转换为自定义的分布式计算框架,来访问存储在HDFS或HBase等NoSQL中的数据。

需要注意的是,Spark SQL这类SQL-on-Hadoop解决方案和传统的MPP解决方案在架构上存在很大差异。根据Hadoop生态系统的特点,SQL-on-Hadoop解决方案从架构上来看可以简单地划分为三层结构。最上层是应用(语言)层,应用层为用户提供数据管理查询的接口,不同的SQL-on-Hadoop系统往往提供各自的SQL语法特性,如[22]Hive的HiveQL、Pig的PigLatin和Spark SQL的DataFrame等。在大[23]数据场景下,应用层也包含一些针对特别需求的接口,如BlinkDB所支持的近似查询功能等。应用层之下是分布式执行层,SQL-on-Hadoop系统通过一定的规则或策略将SQL语句转换为对应的计算模型。除MapReduce、Spark等通用的计算框架外,分布式执行层可能是某些系统自定义的计算单元,例如Impala中的Query Exec Engine等。分布式执行层通过接口访问数据存储层中的数据,并完成相应的计算任务。SQL-on-Hadoop系统的底层是数据存储层,主要负责对关系数据表这样的逻辑视图进行存储与管理。目前,各种SQL-on-Hadoop数据存储层基本都支持分布式文件系统HDFS和分布式NoSQL数据库。总的来看,SQL-on-Hadoop解决方案类似“堆积木”,各层之间松耦合,并可以灵活组合。数据存储层与分布式执行层之间通过特定的数据读写接口进行数据的交互。这种分层解耦的方式一方面具有通用性(各层可以分别分离为子系统)和灵活性(彼此能够互相组合)的优势,另一方面隔离了各层的特性,限制了深度集成优化的空间。1.3Spark SQL发展历程[7]

Spark SQL的前身是Shark,即“Hiveon Spark”,由Reynold Xin主导开发。Shark项目最初启动于2011年,当时Hive几乎算是唯一的SQL-on-Hadoop选择方案。Hive将SQL语句翻译为MapReduce,正如前文所提到的,性能会受限于MapReduce计算模型,始终无法满足各种交互式SQL分析的需求,因此许多机构仍然依赖传统的企业数据仓库(EDW)。Shark的提出就是针对这种需求的,目标是既能够达到EDW的性能,又能够具有MapReduce的水平扩展功能。

Shark建立在Hive代码的基础上,只修改了内存管理、物理计划、执行3个模块中的部分逻辑。Shark通过将Hive的部分物理执行计划交换出来(“swappingout thephysicalexecution engine partofHive”),最终将HiveQL转换为Spark的计算模型,使之能运行在Spark引擎上,从而使得SQL查询的速度得到10~100倍的提升。此外,Shark的最大特性是与Hive完全兼容,并且支持用户编写机器学习或数据处理函数,对HiveQL执行结果进行进一步分析。

但是,随着Spark的不断发展,Shark对Hive的重度依赖体现在架构上的瓶颈越来越突出。一方面,Hive的语法解析和查询优化等模块本身针对的是MapReduce,限制了在Spark系统上的深度优化和维护;另一方面,过度依赖Hive制约了Spark的“One Stack Ruleem All”既定方针,也制约了技术栈中各个组件的灵活集成。在此背景下,Spark SQL项目被提出来,由M ichaelArmbrust主导开发。Spark SQL抛弃原有Shark的架构方式,但汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SQL各个模块的代码。由于摆脱了对Hive的依赖,Spark SQL在数据兼容、性能优化、组件扩展方面都得到了极大的提升。在2014年7月1日的Spark峰会上,Databricks公司宣布终止对Shark的开发,将后续重点放到Spark SQL上。Spark SQL涵盖了Shark的所有特性,用户可以从Shark进行无缝升级,至此,Shark的发展画上了句号。Spark SQL开始迎来蓬勃的发展阶段。如今,Spark SQL已经成为Apache Spark中最为活跃的子项目。1.4本章小结

尽管各式各样的计算模型、处理框架、平台系统层出不穷,令人眼花缭乱难以选择,但是万变不离其宗,底层所涉及的分布式技术、编译原理、数据库理论等基础知识都是通用的。作为开发人员,抓住这些系统背后通用的核心技术才是根本。

本书写作的目的即是如此,旨在从技术细节上深入剖析一个典型系统的实现,并在此基础上分享一些开发实践的经验。Spark SQL作为Spark技术体系中最为活跃的子项目,发展迅速,其内部实现也越来越复杂,许多技术具有广泛的借鉴意义。本书面向大数据开发人员和期望对内部原理有深入了解的用户,内容以Spark 2.1/2.2版本为主,注重源码层面细节的分析,希望本书能为读者的大数据之旅提供帮助。第2章Spark基础知识介绍

作为学习Spark SQL的预备内容,本章用少量篇幅讲解Spark系统的基础知识,包括RDD编程模型、DataFrame和Dataset用户接口。目前关于Spark技术的文章和书籍有很多,熟悉的读者可以直接跳过本章。2.1RDD编程模型

编程模型的灵活程度决定了分布式系统的适用范围。实际上在Spark被提出之前,业界已经出现了大量的分布式计算框架,并提供了各种编程接口抽象(Abstraction)。例如,在典型的MapReduce编程模型中,用户只需要编写Map和Reduce接口即可实现业务逻辑;[24]又如Apache的Hama系统,直接提供了BSP模型的编程接口。

虽然在编程接口的种类和丰富程度上已经比较完善了,但这些系统普遍都缺乏操作分布式内存的接口抽象,导致很多应用在性能上非常低效。这些应用的共同特点是需要在多个并行操作之间重用工作数据集,典型的场景就是机器学习和图应用中常用的迭代算法(每一步对数据执行相似的函数)。例如,许多机器学习算法需要将当前迭代权值调优后的结果数据集作为下次迭代的输入,在使用MapReduce计算框架经过一次Reduce操作后输出数据会写回磁盘,然后从磁盘读取作为下次迭代的输入,这种密集的磁盘IO操作方式极大地降低了性能。

针对这类数据重用的需求,研发人员也尝试过各种解决方案。例[25]如,Google的迭代式图计算系统Pregel会将中间数据存储在内存[26]中,HaLoop在基本的MapReduce接口上又扩展了一个专门用于迭代的接口。然而,这些方案都存在很大的局限性,针对的也只是特定的计算需求,数据重用隐藏在系统实现背后,没有将重用逻辑显式地抽象出来形成通用接口。[27]

RDD则是直接在编程接口层面提供了一种高度受限的共享内存模型,如图2.1所示。RDD是Spark的核心数据结构,全称是弹性分布式数据集(ResilientDistributed Dataset),其本质是一种分布式的内存抽象,表示一个只读的数据分区(Partition)集合。一个RDD通常只能通过其他的RDD转换而创建。RDD定义了各种丰富的转换操作(如map、join和fi lter等),通过这些转换操作,新的RDD包含了如何从其他RDD衍生所必需的信息,这些信息构成了RDD之间的依赖关系(Dependency)。依赖具体分为两种,一种是窄依赖,RDD之间分区是一一对应的;另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。窄依赖中的所有转换操作可以通过类似管道(Pipeline)的方式全部执行,宽依赖意味着数据需要在不同节点之间Shuffl e传输。图2.1 RDD核心抽象

RDD计算的时候会通过一个compute函数得到每个分区的数据。若RDD是通过已有的文件系统构建的,则com pute函数读取指定文件系统中的数据;如果RDD是通过其他RDD转换而来的,则compute函数执行转换逻辑,将其他RDD的数据进行转换。RDD的操作算子包括两类,一类是transformation,用来将RDD进行转换,构建RDD的依赖关系;另一类称为action,用来触发RDD的计算,得到RDD的相关计算结果或将RDD保存到文件系统中。

在Spark中,RDD可以创建为对象,通过对象上的各种方法调用来对RDD进行转换。经过一系列的transformation逻辑之后,就可以调用action来触发RDD的最终计算。通常来讲,action包括多种方式,可以是向应用程序返回结果(show、count和collect等),也可以是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会真正地执行RDD的计算(注:这被称为惰性计算,英文为Lazy Evqluation),这样在运行时可以通过管道的方式传输多个转换。

总结而言,基于RDD的计算任务可描述为:从稳定的物理存储(如分布式文件系统HDFS)中加载记录,记录被传入由一组确定性操作构成的DAG(有向无环图),然后写回稳定存储。RDD还可以将数据集缓存到内存中,使得在多个操作之间可以很方便地重用数据集。总的来讲,RDD能够很方便地支持MapReduce应用、关系型数据处理、流式数据处理(Stream Processing)和迭代型应用(图计算、机器学习等)。

在容错性方面,基于RDD之间的依赖,一个任务流可以描述为DAG。在实际执行的时候,RDD通过Lineage信息(血缘关系)来完成容错,即使出现数据分区丢失,也可以通过Lineage信息重建分区。如果在应用程序中多次使用同一个RDD,则可以将这个RDD缓存起来,该RDD只有在第一次计算的时候会根据Lineage信息得到分区的数据,在后续其他地方用到这个RDD的时候,会直接从缓存处读取而不用再根据Lineage信息计算,通过重用达到提升性能的目的。虽然RDD的Lineage信息可以天然地实现容错(当RDD的某个分区数据计算失败或丢失时,可以通过Lineage信息重建),但是对于长时间迭代型应用来说,随着迭代的进行,RDD与RDD之间的Lineage信息会越来越长,一旦在后续迭代过程中出错,就需要通过非常长的Lineage信息去重建,对性能产生很大的影响。为此,RDD支持用checkpoint机制将数据保存到持久化的存储中,这样就可以切断之前的Lineage信息,因为checkpoint后的RDD不再需要知道它的父RDD,可以从checkpoint处获取数据。

从用户角度来讲,如果要使用Spark进行数据处理和分析,需要编写一个Driver程序,并将其提交到集群执行。RDD在API层次上是Spark系统中底层,且最为灵活,自从DataFrame和Dataset接口(注:2.2节中将介绍)出现后,直接使用RDD的场景越来越少。但是,作为底层支持,熟悉RDD的原理对于理解Spark SQL物理执行阶段的实现逻辑非常重要。如图2.2所示是常见的WordCount案例。图2.2 RDD编程模型(WordCount案例)

众所周知,这个简单的应用程序用来统计一个数据集中每个单词出现的次数。在Spark的程序实现中,首先构造SparkContext对象;然后调用textFile方法从HDFS中加载数据以得到初始的RDD(记为rdd_1),其中每条记录为数据中的一行句子;接下来调用flatMap方法进行RDD的转换,将一行句子切分为多个独立的单词,得到新的RDD(记为rdd_2);再通过map方法将每个词映射为key-value形式,其中key为词本身,value为初始计数值1,再次得到一个新的RDD(记为rdd_3);最后,采用reduceByKey方法将rdd_3中的所有记录归并,统计得到每个单词的计数,转换为最终的RDD(记为rdd_4)。这里需要注意,rdd_1到rdd_4都不会触发执行,直到调用collect这个操作后,整个DAG才开始调度执行。2.2DataFram e与Dataset

对于数据分析开发人员来说,一套直观易用且富有表达力的API能够极大地提高生产力。Spark在RDD基础上,提供了DataFrame和Dataset用户编程接口,并且在跨语言(Scala、Java、Python和R)方面具有很好的支持。为了追求简化,降低开发人员的学习成本,从Spark 2.0开始,DataFrame和Dataset进行了统一。

DataFrame与RDD一样,都是不可变分布式弹性数据集。不同之处在于,RDD中的数据不包含任何结构信息,数据的内部结构可以被看作“黑盒”,因此,直接使用RDD时需要开发人员实现特定的函数来完成数据结构的解析;而DataFrame中的数据集类似于关系数据库中的表,按列名存储,具有Schema信息,开发人员可以直接将结构化数据集导入DataFrame。DataFrame的数据抽象是命名元组(对应Row类型),相比RDD多了数据特性,因此可以进行更多的优化。例如,使用DataFrame来实现WordCount,构造RDD之后,直接用toDF得到只有一列(列名为“line”)的DataFrame,然后调用explode方法将一行数据展开,最后使用groupBy方法完成聚合逻辑。

Dataset是比DataFrame更为强大的API,如图2.3所示,两者整合之后DataFrame本质上是一种特殊的Dataset(Dataset[Row]类型)。Dataset具有两个完全不同的API特征:强类型(Strongly-Typed)API和弱类型(Untyped)API。强类型一般通过Scala中定义的CaseClass或Java中的Class来指定。图2.3 DataFrame与Dataset

作为DataFrame的扩展,Dataset结合了RDD和DataFrame的优点,提供类型安全和面向对象的编程接口,并引入了编码器(Encoder)的概念。典型的Dataset创建与使用案例如下,其中定义的Person类就起到了Encoder的作用。在映射的过程中,Encoder首先检查定义的Person类的类型是否与数据相符,如果不相符(例如age字段大于Long的最大值等),则能够及时提供有用的错误信息,防止以不正确的方式处理数据。类似于DataFrame,Dataset创建之后也能够很容易地使用lambda表达式进行各种转换操作。

Encoder不仅能够在编译阶段完成类型安全检查,还能够生成字节码与堆外数据进行交互,提供对各个属性的按需访问,而不必对整个对象进行反序列化操作,极大地减少了网络数据传输的代价。此外,DataFrame和Dataset这些高级的API,能够利用Spark SQL中的Optim izer和Tungsten技术自动完成存储和计算的优化,降低内存使用,并极大地提升性能。2.3本章小结

本章简单地介绍了Spark系统架构和编程接口中的基本概念。DataFrame从API上借鉴了R和Python语言中Pandas的DataFrame的概念,是业界标准结构化数据处理接口。严格来讲,DataFrame和Dataset都属于Spark SQL模块,每个操作都可以被看作是一条完整SQL语句的“零碎”逻辑。对这些基本概念的了解有助于理解后续内容。从下一章开始,将分析Spark SQL底层的实现与优化机制。第3章Spark SQL执行全过程概述

从高层的语言到底层的计算模型通常都包含各种复杂的中间转换。具体到SQL语言,在数据库几十年的发展过程中,整个架构体系已经非常成熟,各种相似系统(如Hive和Im pala等)的实现均大同小异。本章通过一个简单的案例对整个执行过程进行概述,并简单介绍Spark SQL内部机制中涉及的基本概念和数据结构。3.1从SQL到RDD:一个简单的案例

在典型的Spark SQL应用场景中,数据的读取、数据表的创建和分析都是必不可少的过程。通常来讲,SQL查询所面对的数据模型以关系表为主。如图3.1所示的案例显示了使用Spark SQL进行数据分析的一般步骤。图3.1 一个简单的案例

案例中涉及的操作分为3步。(1)创建SparkSession类。从2.0版本开始,SparkSession逐步取代SparkContext成为Spark应用程序的入口。(2)创建数据表并读取数据。这里假设数据存储在本地名为student的json文件中,包含3条记录且每条记录包含3个列(分别对应学生的id、name和age)。本案例创建了同样名为student的数据表(视图)。(3)通过SQL进行数据分析。在SparkSession类的sql方法中可以输入任意满足语法的语句,本案例所查询的数据是年龄在18岁以上的学生名字。

值得一提的是,上述案例第2步创建数据表时虽然没有显示调用SQL语句(如关系数据库中的“create table”等),但其本质上也是SQL中的一种(DDL操作),在内部转换执行时,所涉及的流程和第3步执行SQL查询的流程类似。因此,从一般性考虑,后续内容只对第3步背后的实现进行分析。

这里首先从通用的角度介绍SQL转换的过程。一般来讲,对于Spark SQL系统,从SQL到Spark中RDD的执行需要经过两个大的阶段,分别是逻辑计划(LogicalPlan)和物理计划(PhysicalPlan),如图3.2所示。图3.2 SQL执行全过程概览

逻辑计划阶段会将用户所写的SQL语句转换成树型数据结构(逻辑算子树),SQL语句中蕴含的逻辑映射到逻辑算子树的不同节点。顾名思义,逻辑计划阶段生成的逻辑算子树并不会直接提交执行,仅作为中间阶段。最终逻辑算子树的生成过程经历3个子阶段,分别对应未解析的逻辑算子树(Unresolved LogicalPlan,仅仅是数据结构,不包含任何数据信息等)、解析后的逻辑算子树(Analyzed LogicalPlan,节点中绑定各种信息)和优化后的逻辑算子树(Optim ized LogicalPlan,应用各种优化规则对一些低效的逻辑计划进行转换)。

物理计划阶段将上一步逻辑计划阶段生成的逻辑算子树进行进一步转换,生成物理算子树。物理算子树的节点会直接生成RDD或对RDD进行transformation操作(注:每个物理计划节点中都实现了对RDD进行转换的execute方法)。同样地,物理计划阶段也包含3个子阶段:首先,根据逻辑算子树,生成物理算子树的列表Iterator[PhysicalPlan](同样的逻辑算子树可能对应多个物理算子树);然后,从列表中按照一定的策略选取最优的物理算子树(SparkPlan);最后,对选取的物理算子树进行提交前的准备工作,例如,确保分区操作正确、物理算子树节点重用、执行代码生成等,得到“准备后”的物理算子树(Prepared SparkPlan)。经过上述步骤后,物理算子树生成的RDD执行action操作(如例子中的show),即可提交执行。

从SQL语句的解析一直到提交之前,上述整个转换过程都在Spark集群的Driver端进行,不涉及分布式环境。SparkSession类的sql方法调用SessionState中的各种对象,包括上述不同阶段对应的SparkSqlParser类、Analyzer类、Optim izer类和SparkPlanner类等,最后封装成一个QueryExecution对象。因此,在进行Spark SQL开发时,可以很方便地将每一步生成的计划单独剥离出来分析。

回到前面的案例,SQL语句较为简单(不包含Join和Aggregation等操作),因此其转换过程也相对简单。如图3.3所示,左上角是SQL语句,生成的逻辑算子树中有Relation、Filter和Project节点,分别对应数据表、过滤逻辑(age>18)和列剪裁逻辑(只涉及3列中的2列)。下一步的物理算子树从逻辑算子树一对一映射得到,Relation逻辑节点转换为FileSourceScanExec执行节点,Filter逻辑节点转换为FilterExec执行节点,Project逻辑节点转换为ProjectExec执行节点。图3.3 实际转换过程

生成的物理算子树根节点是ProjectExec,每个物理节点中的execute函数都是执行调用接口,由根节点开始递归调用,从叶子节点开始执行。图3.3下方展示了物理算子树的执行逻辑,与直接采用RDD进行编程类似。需要注意的是,FileSourceScanExec叶子执行节点中需要构造数据源对应的RDD,FilterExec和ProjectExec中的execute函数对RDD执行相应的transformation操作。

总的来看,SQL转换为RDD在流程上比较清晰。虽然实际生产环境中的SQL语句非常复杂,涉及的映射操作也比较烦琐,但总体上仍然遵循上述步骤。在后续章节会详细剖析这一整套转换流程。第4章会介绍如何从SQL解析为逻辑计划,第5章对逻辑计划(LogicalPlan)中的各个子阶段进行详细分析,第6章介绍物理计划(PhysicalPlan)中的实现机制。3.2重要概念

Spark SQL内部实现上述流程中平台无关部分的基础框架称为Catalyst。在深入分析流程每个阶段的原理之前,本节先简要介绍Catalyst中涉及的重要概念和数据结构,主要包括InternalRow体系、TreeNode体系和Expression体系。3.2.1 InternalRow体系

数据处理首先需要考虑如何表示数据。对于关系表来讲,通常操作的数据都是以“行”为单位的。在Spark SQL内部实现中,InternalRow就是用来表示一行行数据的类,因此图3.3中物理算子树节点产生和转换的RDD类型即为RDD[InternalRow]。此外,InternalRow中的每一列都是Catalyst内部定义的数据类型。

从类的定义来看,InternalRow作为一个抽象类,包含num Fields和update方法,以及各列数据对应的get与set方法,但具体的实现逻辑体现在不同的子类中。需要注意的是,InternalRow中都是根据下标来访问和操作列元素的。如图3.4所示,整个InternalRow体系比较简单,其具体的实现不多,包括BaseGenericInternalRow、UnsafeRow和JoinedRow 3个直接子类。图3.4 InternalRow体系

•BaseGenericInternalRow:同样是一个抽象类,实现了InternalRow中定义的所有get类型方法,这些方法的实现都通过调用类中定义的genericGet虚函数进行,该函数的实现在下一级子类中。

•JoinedRow:顾名思义,该类主要用于Join操作,将两个InternalRow放在一起形成新的InternalRow。使用时需要注意构造参数的顺序。

•UnsafeRow:不采用Java对象存储的方式,避免了JVM中垃圾回收(GC)的代价。此外,UnsafeRow对行数据进行了特定的编码,使得存储更加高效。作为Tungsten计划的重要内容,相关实现在第9章中会涉及。

从直接子类继续往下,BaseGenericInternalRow也衍生出3个子类,分别是GenericInternal-Row、SpecificInternalRow和MutableUnsafeRow类。其中,MutableUnsafeRow和UnsafeRow相关,用来支持对特定的列数据进行修改,这里暂时不作介绍。下面主要介绍GenericInternalRow和SpecificInternalRow。

阅读以上代码,可见GenericInternalRow构造参数是Array[Any]类型,采用对象数组进行底层存储,genericGet也是直接根据下标访问的。这里需要注意,数组是非拷贝的,因此一旦创建,就不允许通过set操作进行改变。而SpecificInternalRow则是以Array[MutableValue]为构造参数的,允许通过set操作进行修改。3.2.2 TreeNode体系

从案例的转换过程可以看到,无论是逻辑计划还是物理计划,都离不开中间数据结构。在Catalyst中,对应的是TreeNode体系。TreeNode类是Spark SQL中所有树结构的基类,定义了一系列通用的集合操作和树遍历操作接口。

如图3.5所示,TreeNode内部包含一个Seq[BaseType]类型的变量children来表示孩子节点。TreeNode定义了foreach、map、collect等针对节点操作的方法,以及transformUp和transformDown等遍历节点并对匹配节点进行相应转换的方法。TreeNode本身是scala.Product类型,因此可以通过productElement函数或productIterator迭代器对CaseClass参数信息进行索引和遍历。实际上,TreeNode一直在内存里维护,不会dum p到磁盘以文件形式存储,且无论在映射逻辑执行计划阶段,还是优化逻辑执行计划阶段,树的修改都是以替换已有节点的方式进行的。图3.5 TreeNode体系

TreeNode提供的仅仅是一种泛型,实际上包含了两个子类继承体系,即图3.5中的QueryPlan和Expression体系。Expression是Catalyst中的表达式体系,下一节会展开介绍。QueryPlan类下面又包含逻辑算子树(LogicalPlan)和物理执行算子树(SparkPlan)两个重要的子类,其中逻辑算子树在Catalyst中内置实现,可以剥离出来直接应用到其他系统中;而物理算子树SparkPlan和Spark执行层紧密相关,当Catalyst应用到其他计算模型时,可以进行相应的适配修改。

作为基础类,TreeNode本身仅提供了最简单和最基本的操作。图3.6列举了TreeNode中现有的一些方法,例如不同遍历方式的transform系列方法、用于替换新的子节点的w ithNewChildren方法等。此外,treeString函数能够将TreeNode以树型结构展示,在查看表达式、逻辑算子树和物理算子树时经常用到。图3.6 TreeNode基本操作

除上述操作外,Catalyst中还提供了节点位置功能,即能够根据TreeNode定位到对应的SQL字符串中的行数和起始位置。该功能在SQL解析发生异常时能够方便用户迅速找到出错的地方,具体参见如下代码。

可以看到,Origin提供了line和startPosition两个构造参数,分别代表行号和偏移量。在CurrentOrigin对象中,提供了各种set和get操作。其中,比较重要的是w ithOrigin方法,支持在TreeNode上执行操作的同时修改当前origin信息。

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载