深度实践Spark机器学习(txt+pdf+epub+mobi电子书下载)


发布时间:2020-06-24 06:18:24

点击下载

作者:吴茂贵

出版社:机械工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

深度实践Spark机器学习

深度实践Spark机器学习试读:

前言

为什么写这本书

大数据、人工智能正在改变或颠覆各行各业,包括我们的生活。大数据、人工智能方面的人才已经供不应求,但作为人工智能的核心——机器学习,因涉及的知识和技能比较多,除了需要具备一定的数学基础、相关业务知识外,还要求有比较全面的技术储备,如操作系统、数据库、开发语言、数据分析工具、大数据计算平台等,无形中提高了机器学习的门槛。如何降低机器学习的门槛,让更多有志于机器学习、人工智能的人能更方便或顺畅地使用、驾驭机器学习?

很多企业也正在考虑和处理这方面的问题,本书也希望借Spark技术在这方面做一些介绍或总结。

如何使原本复杂、专业性强的工作或操作简单化?封装是一个有效方法。封装降低了我们操作照相机的难度、降低了我们维护各种现代设备的成本,同时也提升了我们使用这些设备的效率。除封装外,过程的标准化、流程化同样是目前现代企业用于提升生产效率,降低成本,提高质量的有效方法。

硬件如此,软件行业同样如此。目前很多机器学习的开发语言或平台,正在这些方面加大力度,比如:对特征转换、特征选择、数据清理、数据划分、模型评估及优化等算法的封装;对机器学习过程的进行流程化、标准化、规范化;给大家比较熟悉的语言或工具提供API等方法或措施,以简化机器学习中间过程,缩短整个开发周期,使我们能更从容地应对市场的变化。Spark在这方面可谓后来居上,尤其是最近发布的版本,明显加大了这方面的力度,我们可以从以下几个方面看出这种趋势:

1)Spark机器学习的API,正在由基于RDD过渡到基于Dataset或DataFrame,基于RDD的API在Spark2.2后处于维护阶段,Spark3.0后将停止使用(来自Spark官网);

2)建议大家使用Spark ML,尤其是它的Pipeline;

3)增加大量特征选择、特征转换、模型选择和优化等算法;

4)丰富、增强Spark与Java、Python、R的API,使其更通用。

SKLearn、Spark等机器学习平台或工具在这方面都处于领先的地位,我们也希望借助本书,把Spark在这方面的有关内容介绍给大家,使大家可以少走些弯路。

此外,Spark目前主要涉及常用机器学习算法,缺乏对一般神经网络的支持,更不用说深度学习了,这好像也是目前Spark的一个不足。不过好消息是:雅虎把深度学习框架TensorFlow与Spark整合在一起,而且开源了这些代码。为弥补广大Spark爱好者的上述缺憾,本书介绍了TensorFlowOnSpark,其中包括深度学习框架TensorFlow的基础知识及使用卷积神经网络、循环神经网络等的一些实际案例。

另外,我们提供了与本书环境完全一致的免费云操作环境,这样一来是希望节约您的宝贵时间,二来是希望能通过真正的实战,给您不一样的体验和收获!总之,我们希望能使更多有志于大数据、人工智能的朋友加入这个充满生机、前景广阔的行业中来。本书特色

本书最大特点就是注重实战!或许有读者会问,能从哪几个方面体现出来?

1)介绍了目前关于机器学习的新趋势,并分析了如何使用Pipeline使机器学习过程流程化。

2)简介了机器学习的一般框架Spark、深度学习框架Tensorflow及把两者整合在一起的框架TensorflowOnSpark。

3)提供可操作、便执行及具有实战性的项目及其详细代码。

4)提供与书完全一致的云操作环境,而且这个环境可以随时随地使用实操环境,登录地址为http://www.feiguyun.com/spark/support。

5)除了代码外,还附有一些必要的架构或原理说明,便于大家能从一个更高的角度来理解把握相关问题。

总之,希望你通过阅读本书,不但可以了解很多内容或代码,更可以亲自运行或调试这些代码,从而带来新的体验和收获!读者对象

·对大数据、机器学习感兴趣的广大在校、在职人员。

·对Spark机器学习有一定基础,欲进一步提高开发效率的人员。

·熟悉Python、R等工具,希望进一步拓展到Spark机器学习的人。

·对深度学习框架TensorFlow及其拓展感兴趣的读者。如何阅读本书

本书正文共14章,从内容结构来看,可以分为四部分。

第一部分为第1~7章,主要介绍了机器学习的一些基本概念,包括如何构建一个Spark机器学习系统,Spark ML主要特点,Spark ML中流水线(Pipeline),ML中大量特征选取、特征转换、特征选择等函数或方法,同时简单介绍了Spark MLlib的一些基础知识。

第二部分为第8~12章,主要以实例为主,具体说明如何使用Spark ML中Pipeline的Stage,以及如何把这Stage组合到流水线上,最后通过评估指标,优化模型。

第三部分即第13章,与之前的批量处理不同,这一章主要以在线数据或流式数据为主,介绍Spark的流式计算框架Spark Streaming。

第四部分即第14章,为深度学习框架,主要包括TensorFlow的基础知识及它与Spark的整合框架TensorFlowOnSpark。

此外,书中的附录部分还提供了线性代数、概率统计及Scala的基础知识,以帮助读者更好地掌握机器学习的相关内容。勘误和支持

除封面署名外,参加本书编写、环境搭建的人还有杨本法、张魁、刘未昕等、杨本法负责第12章Spark R的编写,张魁、刘未昕负责后台环境的搭建和维护。由于笔者水平有限,加之编写时间仓促,书中难免出现错误或不准确的地方。恳请读者批评指正,你可以通过访问http://www.feiguyun.com留下宝贵意见。也可以通过微信(wumg3000)或QQ(1715408972)给我们反馈。非常感谢你的支持和帮助。致谢

在本书编写过程中,得到很多在校老师和同学的支持!感谢上海大学机电工程与自动化学院的王佳寅老师及黄文成、杨中源、熊奇等同学,上海理工管理学院的张帆老师,上海师大数理学院的田红炯、李昭祥老师,华师大的王旭同学,博世王冬,飞谷云小伙伴等提供的支持和帮助。

感谢机械工业出版社的杨福川、李艺老师给予本书的大力支持和帮助。

感谢参与本书编写的其他作者及提供支持的家人们,谢谢你们!第1章了解机器学习

大数据、人工智能是目前大家谈论比较多的话题,它们的应用也越来越广泛,与我们的生活关系也越来越密切,影响也越来越深远,其中很多已进入寻常百姓家,如无人机、网约车、自动导航、智能家电、电商推荐、人机对话机器人等。

大数据是人工智能的基础,而使大数据转变为知识或生产力,离不开机器学习(Machine Learning),可以说机器学习是人工智能的核心,是使机器具有类似人的智能的根本途径。

本章主要介绍与机器学习有关的概念,机器学习与大数据、人工智能间的关系,机器学习常用架构及算法等,具体如下:

·机器学习的定义

·大数据与机器学习

·机器学习与人工智能、深度学习

·机器学习的基本任务

·如何选择合适算法

·Spark在机器学习方面的优势1.1 机器学习的定义

机器学习是什么?是否有统一或标准定义?目前好像没有,即使在机器学习的专业领域,也没有一个被广泛认可的定义。在维基百科上对机器学习有以下几种定义:(1)机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能。(2)机器学习是对能通过经验自动改进的计算机算法的研究。(3)机器学习是用数据或以往的经验来优化计算机程序的性能标准。

一种经常引用的英文定义是:A computer program is said to learn from experience(E)with respect to some class of tasks(T)and performance(P)measure,if its performance at tasks in T,as measured by P,improves with experience E。

可以看出机器学习强调三个关键词:算法、经验、性能,其处理过程如图1-1所示。图1-1 机器学习处理流程

图1-1表明机器学习是使数据通过算法构建出模型,然后对模型性能进行评估,评估后的指标如果达到要求就用这个模型测试新数据,如果达不到要求就要调整算法重新建立模型,再次进行评估,如此循环往复,最终获得满意结果。1.2 大数据与机器学习

我们已进入大数据时代,产生数据的能力迅速增长,如互联网、移动互联网、物联网、成千上万的传感器、穿戴设备、GPS等都会产生大量数据,存储数据、处理数据等能力也得到了几何级数的提升,如利用Hadoop、Spark技术为我们存储、处理大数据提供有效方法。

数据就是信息,就是依据,其背后隐含了大量不易被我们感官识别的信息、知识、规律等,如何揭示这些信息、规则、趋势,正成为当下能给企业带来高回报的热点。

而机器学习的任务,就是要在大数据量的基础上,发掘其中蕴含的有用信息。其处理的数据越多,机器学习就越能体现出优势,以前很多用机器学习解决不了或处理不好的问题,通过大数据可以得到很好解决,性能也会大幅提升,如语言识别、图像识别、天气预测等。1.3 机器学习、人工智能及深度学习“人工智能”和“机器学习”这两个科技术语如今广为流传,已成为当下的热词,然而,它们有何区别?又有哪些相同或相似的地方?虽然人工智能和机器学习高度相关,但却并不尽相同。

人工智能是计算机科学的一个分支,目的是开发一种拥有智能行为的机器,目前很多大公司都在努力开发这种机器学习技术,努力让计算机学会人类的行为模式,以便推动很多人眼中的下一场技术革命——让机器像人类一样“思考”。

过去10年,机器学习为我们带来了无人驾驶汽车、实用的语音识别、有效的网络搜索等。接下来它将如何改变我们的生活?在哪些领域最先发力?让我们拭目以待。

有一点需要注意,对很多机器学习来说,特征提取不是一件简单的事情。在一些复杂问题上,要想通过人工的方式设计有效的特征集合,往往要花费很多的时间和精力。

作为机器学习的一个分支,深度学习解决的核心问题之一就是自动将简单的特征组合成更加复杂的特征,并利用这些组合特征解决问题。它除了可以学习特征和任务之间的关联以外,还能自动从简单特征中提取更加复杂的特征。图1-2展示了深度学习和传统机器学习在流程上的差异。深度学习算法可以从数据中学习更加复杂的特征表达,使得最后一步权重学习变得更加简单且有效。图1-2 机器学习与深度学习流程对比

前面我们分别介绍了机器学习、人工智能及深度学习,那么它们的关系如何?

人工智能、机器学习和深度学习是紧密相关的几个领域。图1-3说明了它们之间的大致关系。人工智能是一类非常广泛的问题,机器学习是解决这类问题的一个重要手段,深度学习则是机器学习的一个分支。在很多人工智能问题上,深度学习的方法突破了传统机器学习方法的瓶颈,推动了人工智能领域的快速发展。图1-3 人工智能、机器学习与深度学习间的关系1.4 机器学习的基本任务

机器学习基于数据,并以此获取新知识、新技能。它的任务有很多,分类是其基本任务之一。所谓分类,就是将新数据划分到合适的类别中,一般用于类别型的目标特征。如果目标特征为连续型,则往往采用回归方法。回归是对新目标特征进行预测,是机器学习中使用非常广泛的方法之一。

分类和回归,都是先根据标签值或目标值建立模型或规则,然后利用这些带有目标值的数据形成的模型或规则,对新数据进行识别或预测。这两种方法都属于监督学习。与监督学习相对的是无监督学习,无监督学习不指定目标值或预先无法知道目标值,它可以把相似或相近的数据划分到相同的组里,聚类就是解决这一类问题的方法之一。

除了监督学习、无监督学习这两种最常见的任务外,还有半监督学习、强化学习等,这里我们就不展开了,图1-4展示了这些基本任务间的关系。图1-4 机器学习基本任务的关系1.5 如何选择合适算法

当我们接到一个数据分析或挖掘的任务或需求时,如果希望用机器学习来处理,首先要做的是根据任务或需求选择合适算法,选择算法的一般步骤如图1-5所示。图1-5 选择算法的一般步骤

充分了解数据及其特性,有助于我们更有效地选择机器学习算法。采用以上步骤在一定程度上可以缩小算法的选择范围,使我们少走些弯路,但在具体选择哪种算法方面,一般并不存在最好的算法或者可以给出最好结果的算法。在实际做项目的过程中,这个过程往往需要多次尝试,有时还要尝试不同算法。不过先用一种简单熟悉的方法,然后,在这个基础上不断优化,时常能收获意想不到的效果。1.6 Spark在机器学习方面的优势

在大数据基础上进行机器学习,需要处理全量数据并进行大量的迭代计算,这要求机器学习平台具备强大的处理能力。Spark与Hadoop兼容,它立足于内存计算,天然适用于迭代式计算。Spark是一个大数据计算平台,其具体有以下优势:

·完整的大数据生态系统:大家熟悉的SQL式操作组件Spark SQL,功能强大、性能优良的机器学习库Spark MLlib,用于图像处理的SparkGraphx及用于流式处理的SparkStreaming等。

·高性能的大数据计算平台:因为数据被加载到集群主机的分布式内存中,所以数据可以被快速转换迭代,并缓存后续的频繁访问需求。基于内存运算,Spark可以比Hadoop快100倍,在磁盘中运算也比Hadoop快10倍左右。

·与Hadoop、Hive、HBase等无缝连接:Spark可以直接访问Hadoop、Hive、HBase等的数据,同时也可使用Hadoop的资源管理器。

·易用、通用、好用:Spark编程非常高效、简洁,支持多种语言的API,如Scala、Java、Python、R、SQL等,同时提供类似于Shell的交互式开发环境REPL。1.7 小结

本章简单介绍了机器学习与大数据、人工智能的关系,同时也介绍了机器学习的一些基本任务和如何选择合适算法等问题。在选择机器学习平台时,我们着重介绍了Spark这样一个大数据平台的集大成者,它有很多优势,而且得到了很多企业的青睐。Spark是本书的主要介绍对象,下一章我们将介绍如何构建一个Spark机器学习系统。第2章构建Spark机器学习系统

构建机器学习系统的方法,根据业务需求和使用工具的不同,可能会有些区别,不过主要流程差别不大,基本包括数据抽取、数据探索、数据处理、建立模型、训练模型、评估模型、优化模型、部署模型等阶段。在构建系统前,我们需要考虑系统的扩展性,与其他系统的整合,系统升级及处理方式等。本章我们主要介绍基于Spark机器学习的架构设计或系统构建的一般步骤,以及需要注意的一些问题。

构建Spark机器学习系统的一般步骤如下:

·介绍系统架构

·启动集群

·加载数据

·探索数据

·数据预处理

·构建模型

·模型评估

·模型优化

·模型保存2.1 机器学习系统架构

Spark发展非常快,到我们着手编写本书时,Spark已升级为2.1版。自2.0以后,Spark大大增强了数据流水线的内容。数据流水线的思路与SKLearn非常相似,我想这种思路或许是未来的一个趋势,使机器学习的流程标准化、规范化、流程化,将很多原来需要自己编写的代码封装成可直接调用的模块或函数,模型评估、调优这些任务也可实现了更高的封装,大大降低机器学习的门槛。

Spark机器学习系统的架构图如图2-1所示,其中数据探索与预处理、训练及测试算法或建模阶段可以组装成流水线方式,模型评估及优化阶段可以采用自动化方式。图2-1 Spark机器学习系统的架构图2.2 启动集群

对于Spark集群的安装配置,这里不做详细介绍,对Spark集群的安装配置感兴趣的读者,可参考由我们编写的《自己动手做大数据系统》。

常见的Spark运行方式有本地模式、集群模式。本地模式所有的处理都运行在同一个JVM中,而后者,可以运行在不同节点上。具体运行模式如表2-1所示。表2-1 Spark运行模式

本书主要以Spark Standalone(独立模式)为例,如果想以其他模式运行,只要改动对应参数即可。

Spark支持Scala或Python的REPL(Read-Eval-Print-Loop,交互式shell)来进行交互式程序编写,交互式编程在输入的代码执行后立即能看到结果,非常友好和方便。

在2.0之前的Spark版本中,Spark shell会自动创建一个SparkContext对象sc。SparkContext与驱动程序(Driver Program)和集群管理器(Cluster Manager)间的关系如图2-2所示。图2-2 SparkContext与驱动程序、集群管理器间的关系图

从图2-2中可以看到,SparkContext起中介的作用,通过它来使用Spark其他的功能。每一个JVM都有一个对应的SparkContext,Driver Program通过SparkContext连接到集群管理器来实现对集群中任务的控制。Spark配置参数的设置以及对SQLContext、HiveContext和StreamingContext的控制也要通过SparkContext。

不过在Spark 2.0中引入了SparkSession对象(spark),运行Spark shell则会自动创建一个SparkSession对象,在输入spark时就会发现它已经存在了(见图2-3)、SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中,它为用户提供了一个统一的切入点,同时也提供了各种DataFrame和Dataset的API,大大降低了学习Spark的难度。图2-3 启动Spark shell界面

图2-3所示是启动Spark集群的界面,编程语言是Scala,如果希望使用Python为编辑语句,该如何启动呢?运行Pyspark即可,如图2-4所示。图2-4 启动PySpark的客户端2.3 加载数据

这里以MovieLens 100k(http://files.grouplens.org/datasets/movielens/ml-100k.zip)数据集中的用户数据(u.data)为例,首先在本地查看数据的基本信息,然后把本地文件复制到HDFS上,Spark或PySpark会读取HDFS上的数据。

查看u.user文件的基本信息、数据样例、总记录数等信息。$ head -3 u.user1|24|M|technician|857112|53|F|other|940433|23|M|writer|32067$ cat u.user |wc -l943

u.user用户数据每列的含义为user id|age|gender|occupation|zip code,即用户ID、用户年龄、用户性别、用户职位、所在地邮编等信息,列间的分隔符为竖线(|),共有943条记录。

如何把用户信息复制到HDFS上?首先,查看当前HDFS的目录信息。$ hadoop fs -ls /u01/bigdata/Found 2 itemsdrwxr-xr-x - hadoop supergroup 0 2017-02-07 03:20 /u01/bigdata/datadrwxr-xr-x - hadoop supergroup 0 2016-07-20 09:16 /u01/bigdata/hive

由此可知在HDFS上已有/u01/bigdata/data目录(如果没有目录可以通过hadoop fs-mkdire/u01/bigdata/data命令创建。),通过以下命令,把本地文件u.user复制到HDFS上。$ hadoop fs -put u.user /u01/bigdata/data//查看HDFS上的文件$ hadoop fs -ls /u01/bigdata/data-rw-r--r-- 1 hadoop supergroup 22628 2017-03-18 13:37 /u01/bigdata/data/u.user

把电影评级数据(u.data)、电影数据(u.item)等复制到HDFS的方法与上边的相同,把本地数据复制到HDFS后,Spark如何读取加载HDFS上的文件?我们可以通过Spark的textFile方法读取。这里我们以PySpark为例,启动PySpark客户端,导入需要的包,然后通过textFile方法读取HDFS上的数据,具体请看以下示例:###以Spark独立模式启动Pyspark客户端pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 2###导入需要的包from pyspark.sql import SparkSessionfrom pyspark.sql import Row##初始化sparkSessionspark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()###加载数据,并处理分割符数据sc = spark.sparkContextuserrdd = sc.textFile("hdfs://master:9000/u01/bigdata/data/u.user").map(lambda line: line.split("|"))###利用反射机制推断模式(Schema),把dataframe注册为一个tabledf = userrdd.map(lambda fields: Row(userid=fields[0], age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4]))schemauser = spark.createDataFrame(df)schemauser.createOrReplaceTempView("user")2.4 探索数据

生产环境中数据往往包含很多脏数据,如缺失数据、不一致、不规范、奇异数据等,所以在数据加载后且在数据建模前,需要对数据进行分析或探索,尤其面对大数据,了解数据的统计信息、数据质量、数据特征等,为数据处理、数据建模提供重要依据。在进行这些数据分析时,如果能实现数据的可视化,当然更利于我们理解数据。2.4.1 数据统计信息

加载数据后,首先要关注数据的统计信息,有了数据统计信息,我们对数据就有了一个大致了解,如数据特征的最大值、最小值、平均值、分位数、方差等。这些信息有助于我们理解数据质量、数据构成,为数据预处理提供重要依据。查看用户各字段的统计信息:schemauser.describe("userid","age", "gender","occupation","zip").show()+-------+-----------------+-----------------+------+-------------+------------------+|summary| userid| age|gender| occupation| zip|+-------+-----------------+-----------------+------+-------------+------------------+| count| 943| 943| 943| 943| 943|| mean| 472.0|34.05196182396607| null| null| 50868.78810810811|| stddev|272.3649512449549|12.19273973305903| null| null|30891.373254138158|| min| 1| 7| F|administrator| 00000|| max| 99| 73| M| writer| Y1A6B|+-------+-----------------+-----------------+------+-------------+------------------+

从以上统计可以看出,用户表总记录数为943条,年龄最小为7岁,最大为73岁,平均年龄为34岁。2.4.2 数据质量分析

数据质量分析是数据探索阶段重要一环,数据不是完美的,往往存在缺少数据的情况,还可能包含不一致数据、异常数据、噪声数据等。没有可信的数据,再好的模型性能都不太可能好,正所谓“垃圾进,垃圾出”。

数据质量方面的分析,主要包括以下几个方面:

·缺失值;

·异常值;

·不一致的值;

·错误数据。

本节以一份某酒店的销售额数据为例,来说明在数据探索中,对数据质量的一般分析方法。##以Spark独立模式,启动Pyspark客户端pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 2###导入需要的库import pandas as pdimport matplotlib.pyplot as plt ###加载数据,使用标题行df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0)##查看df的统计信息df.count() ##统计非空值记录数sale_date 200sale_amt 198 ###说明sale_amt有两个空值df.describe() ###获取df的统计信息 sale_amtcount 198.000000mean 2765.545152std 709.557639min 22.00000025% 2452.72500050% 2655.85000075% 3023.500000max 9106.440000#建立图像plt.figure()#画箱线图bp = df.boxplot()# flies为异常值的标签x = bp['fliers'][0].get_xdata()y = bp['fliers'][0].get_ydata()y.sort() #用annotate添加注释for i in range(len(x)): plt.annotate(y[i], xy = (x[i],y[i]), xytext=(x[i]+0.1-0.8/(y[i]-y[i-1]),y[i]))plt.show()

运行结果如图2-5所示。图2-5 销售额箱线图检测异常值

从以上分析可知,销售额列存在两个空值、六个可能的异常值,其中865.0和1060.0有可能属于正常值,当然也需要和相关业务员沟通,对于其他数值,需要进一步分析异常值产生的原因,然后确定数据的去留。2.4.3 数据特征分析

对数据质量有基本了解后,接下来就可以对数据的特征进行分析,数据特征分析一般包括以下内容:

·特征分布分析;

·对比分析;

·统计量分析。

特征一般指用于模型训练的变量,原始数据中的特征有些是数值,有些是字符或其他格式信息,但在进行机器学习前,都需要转换为数值。根据实际情况,有时需要根据已有特征生成或衍生出新特征,如根据用户年龄衍生出表示老、中、青的新特征;有时需要对一些特征进行规范化、标准化等转换,尤其是回归类模型。

1.数据特征分析实例

特征的分布分析有助于发现相关数据的分布特征、分布类型、分布是否对称等,可以使用数据可视化方法,这样便于直观发现特征的异常值。下面以用户信息数据为例,分析用户的年龄特征、职业特征等。from pyspark.sql import SparkSessionfrom pyspark.sql import Rowspark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()sc = spark.sparkContext# 加载textfile文件并转换为行式userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/data/u.user").map(lambda line: line.split("|"))#利用反射机制把RDD转换为DataFramedf = userrdd.map(lambda fields: Row(name=fields[0], age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4]))# 把dataframe注册为一个tableschemauser = spark.createDataFrame(df)schemauser.createOrReplaceTempView("user")# 在table上运行SQLage = spark.sql("SELECT age FROM user")#把运行结果转换为RDDages = age.rdd.map(lambda p: p.age).collect()hist(ages, bins=20, color='lightblue', normed=True)

运行结果如图2-6所示。

从图2-6可以看出,最小年龄在10岁左右,最大年龄超过70岁,大部分是20~40岁。图2-6 用户年龄特征分布图

我们还可以进一步分析用户职业分布特征。# 选取用户职业数据count_occp = spark.sql("SELECT occupation,count(occupation) as cnt FROM user Group by occupation order by cnt")#查看前5行数据count_occp.show(5)+----------+---+|occupation|cnt|+----------+---+| homemaker| 7|| doctor| 7|| none| 9|| lawyer| 12|| salesman| 12|+----------+---+#获取职业名称及职业数,以便画出各职业总数图形#把运行结果转换为RDDx_axis = count_occp.rdd.map(lambda p: p.occupation).collect()y_axis = count_occp.rdd.map(lambda p: p.cnt).collect()pos = np.arange(len(x_axis))width = 1.0###隐式新增一个figure,或为当前figure新增一个axesax = plt.axes() ax.set_xticks(pos + (width / 2)) ###设置x轴刻度ax.set_xticklabels(x_axis) ####为对应刻度打上标签plt.bar(pos, y_axis, width, color='orange')plt.xticks(rotation=30) ####x轴上的标签旋转30度fig = matplotlib.pyplot.gcf() ###获取当前figure的应用fig.set_size_inches(16, 10) ###设置当前figure大小

从图2-7所示的用户职业分布图可以看出,学生占绝大多数,其次是其他职业从业者、教育工作者、管理者、工程师等。医生、家庭主妇或许平时较忙,故数量比较少。图2-7 用户职业分布图

2.特征分布及相关性分析

在数据探索阶段分析特征分布、特征间的相关性等,将为后续的特征选择、特征提取将提供重要依据。以下是对共享单车数据的特征分析,详细内容可参考9.3节。###探索特征间分布、相关性等import pandas as pdimport seaborn as snsimport matplotlib.pyplot as pltdata_pd=data1.toPandas()sns.set(style='whitegrid',context='notebook')cols=['temp','atemp','label']sns.pairplot(data_pd[cols],size=2.5)plt.show()

运行结果如图2-8所示。

3.对比分析

下面以销售数据为例进行对比分析,运行结果如图2-9所示。图2-8 hours数据集特征分布及相关性示例图###导入需要的库import pandas as pd###把日期列作为索引,并转换为日期格式df = pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0,index_col='sale_date',parse_dates=True)###把空值置为0df1=df.fillna(0)###根据年月求和df_ym=df1.resample('M',how='sum')##取年月df2=df_ym.to_period('M')##数据可视化df2.plot(kind='bar',rot=30)图2-9 销售月份对比图2.4.4 数据的可视化

数据的可视化是数据探索、数据分析中的重要任务,通过可视化可以帮助我们发现数据的异常值、特征的分布情况等,为数据预处理提供重要支持。Spark目前对数据的可视化功能还很弱或还没有,不过没关系,我们可以借助Python或R等可视化功能,Python和R在数据可视化方面功能很强大,这里以Python的数据可视化为例。Python的数据表现能力很强,其可视化可以使用matplotlib或plot等方法。matplotlib是一种比较低级但强大的绘图工具,可以进行很多定制化,但往往需要较大代码来实现;plot是一种非常简洁的绘图工具,它主要基于pandas基础之上。接下来我们通过两个示例来具体说明。2

下例是通过matplotlib可视化sin(x)和cos(x)函数的图形。# -*- coding: utf-8 -*-import numpy as npimport matplotlibimport matplotlib.pyplot as pltplt.rcParams['font.sans-serif']=['SimHei'] ###显示中文plt.rcParams['axes.unicode_minus']=False ##防止坐标轴上的"-"号变为方块x = np.linspace(0, 10, 100)y = np.sin(x)y1 = np.cos(x)##绘制一个图,长为10,宽为6(默认值是每个单位80像素)plt.figure(figsize=(10,6))###在图列中自动显示$间内容plt.plot(x,y,label="$sin(x)$",color="red",linewidth=2)plt.plot(x,y1,"b--",label="$cos(x^2)$") ###b(blue),--线形plt.xlabel(u"X值") ##X坐标名称,u表示unicode编码plt.ylabel(u"Y值")plt.title(u"三角函数图像") ##t图名称plt.ylim(-1.2,1.2) ##y上的max、min值plt.legend() ##显示图例plt.savefig('fig01.png') ##保持到当前目录plt.show()

运行结果如图2-10所示。图2-10 matplot数据可视化

同样的,如果我们使用plot对这些数据来进行可视化,代码可以非常简洁,但定制化方面可能要弱一些。from pandas import DataFrameimport pandas as pdimport numpy as npx = np.linspace(0, 10, 100)df=DataFrame({'sin(x)':np.sin(x),'cos(x)':np.cos(x)},index=x)df.plot()

显示图形如图2-11所示。图2-11 plot数据可视化

从以上实现代码可以看出,如果使用plot进行数据可视化则非常简单,虽然定制化要比matplotlib少些,但其可定制的项也不少,如kind、rot、title、legend等。2.5 数据预处理

前面我们介绍了探索数据的一些方法,通过对数据的探索,可以帮助我们发现一些奇异数据、缺失数据、特征的类别及其分布情况等信息。而这些信息正是对数据预处理的重要依据。在数据分析、机器学习中,数据的预处理非常关键,尤其是涉及大数据的处理,往往是比较费时、费神的过程,有时还需要往返多次。当然,如果数据预处理做得好,除提高数据质量外,更能极大提高模型的性能。

数据的预处理一般包括数据清理、数据变换、数据集成、数据归约等,如图2-12所示。图2-12 数据预处理示意图2.5.1 数据清理

数据清理的主要任务是填补缺失值、光滑噪声数据、处理奇异数据、纠正错误数据、删除重复数据、删除唯一性属性、去除不相关字段或特征、处理不一致数据等。噪声数据的处理方法为分箱、聚类等。以下分别以处理缺失数据、异常数据为例,说明在Spark中如何进行数据清理。

1.处理缺失数据import pandas as pd##读取HDFS上的数据df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0)##定位数据集中的空值df[df.isnull().values==True]##显示结果如下,说明有2个空值 sale_date sale_amt13 2015/2/14 NaN32 2015/1/26 NaN###以0填补空值df.fillna(0)##或该列的平均值填补空值df['sale_amt'].fillna(df['sale_amt'].count())##或用该列前一行值填补空值df.fillna(method='pad')

2.处理奇异数据

在数据探索阶段,我们发现销售数据文件catering_sale.csv中有6个可能的奇异数据,假设与相关人员核实后,只有22.0为奇异数据或错误数据。对错误数据我们一般采用删除或替换的方法,这里我们采用Spark SQL来处理奇异数据。

首先把数据复制到HDFS,用Spark读取数据,如果启动Pyspark,则可以通过spark.read.csv(“/home/hadoop/data/catering_sale.csv”,header=True)读取;如果启动spark-shell,则可以采用spark.read.option(“header”,”true”).csv(“hdfs://192.168.1.112:9000/home/hadoop/data/catering_sale.csv”)的方式读取。#读取CSV文件,保留文件标题,并创建Spark 的一张derby数据库的表df=spark.read.csv("/home/hadoop/data/catering_sale.csv",header=True)##转换数据类型df1=df.select(df['sale_date'],df['sale_amt'].cast("Double"))###假设把22.0奇异数据替换为200.0df1.replace(22.0,200.0,'sale_amt')

这里我们使用了DataFrame的select、replace等方法,实际上df还有很多可利用的方法或函数,可以通过df.+Tab键查看,如图2-13所示。

这些方法或函数的具体使用,可以通过df.方法名的方式查看,图2-14所示为查看df.filter的详细用法。图2-13 其他可利用的方法或函数图2-14 查看df.filter的使用方法

此外,我们还可以使用大量spark.sql.functions或pyspark.sql.functions函数,以下是使用去除字段左右空格、截取字段长度等内置函数的示例:from pyspark.sql.functions import *###去空格df.select(trim(df.sale_date)).show()###去年份df.select(substring(df.sale_date,1,4).alias('year'),df.sale_amt).show()2.5.2 数据变换

数据变换是数据预处理中的一项重要内容,如对数据进行规范化、离散化、衍生指标、类别特征数值化、平滑噪声等都属于数据变换。在Spark ML中有很多现成的数据变换算法,利用这些算法可极大提高整个数据处理的效率,表2-2所示只是概况,更多详细信息请参考第4章。表2-2 Spark ML自带的数据变换算法

这里我们以卡方检验为例,分析如何根据特征的贡献率来选择特征。假设我们有很多特征,如表示时间特征的季节(season)、年月(yr)、月份(mnth)、是否节假日(holiday)、是否周末(weekday),表示天气的特征weathersit、temp等。为了使用卡方检验来选择这些特征,首先需要把各特征组合为一个特征向量,然后,把整合后的特征向量及选择特征个数等代入卡方模型中,详细代码如下://定义特征向量featuresArray =["season","yr","mnth","hr","holiday","weekday","workingday","weathersit","temp","atemp","hum","windspeed"]###把各特征组合成特征向量featuresassembler = VectorAssembler(inputCols=featuresArray,outputCol="features")###选择贡献度较大的前5个特征selectorfeature = ChiSqSelector(numTopFeatures=5, featuresCol="features",outputCol="selectedFeatures", labelCol="label")2.5.3 数据集成

数据集成是将多文件或者多数据库中的数据进行合并,然后存放在一个一致的数据存储中。数据集成一般通过join、union或merge等关键字把两个(或多个)数据集连接在一起,Spark SQL(包括DataFrame)有join方法,Pandas下有merge方法。数据集成往往需要耗费很多资源,尤其是大数据间的集成涉及shuffle过程,有时需要牵涉多个节点,所以除了数据一致性外,性能问题常常不请自来,需要我们特别留心。

传统数据库一般是在单机上采用hash join方法,如果在分布式环境中,采用join时,可以考虑充分利用分布式资源进行平行化。当然,在进行join之前,对数据过滤或归约也是常用的优化方法。

Spark SQL中有三种join方法:

·broadcast hash join:如果join的表中有一张大表和一张较小的表,可以考虑把小表广播分发到大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。

·shuffle hash join:如果两张表都不小,对数据量较大的表进行广播分发就不太适合。这种情况下,可以根据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区,这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。

·sort merge join:对数据量较大的表也可以考虑使用sort merge join方法,先将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理,然后,对单个分区节点的两表数据分别进行排序,最后,对排好序的两张分区表数据执行join操作。

DataFrame中join的方式有(或merge)内连接、左连接、右连接等。2.5.4 数据归约

大数据是机器学习的基础,但大数据往往数据量非常大,有时我们可以通过数据归约技术删除或减少冗余属性(或维)、精简数据集等,使归约后的数据比原数据小,甚至小很多,但仍然接近于保持原数据的完整性,且结果与归约前结果相同或几乎相同。表2-3所示列举了Spark ML自带的特征选择或降维算法。表2-3 Spark ML自带的数据选择算法及功能简介

选择特征或降维是机器学习中重要的处理方法,我们可以使用上述这些方法在减少特征个数、消除噪声等问题的同时,维持原始数据的内在结构或主要特征。尤其是降维,在大数据、机器学习中具有重要作用,以下通过两个实例说明SVD、PCA算法的具体使用(目前Spark MLlib支持SVD及PCA)。import org.apache.spark.mllib.linalg.Matriximport org.apache.spark.mllib.linalg.SingularValueDecompositionimport org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.linalg.distributed.RowMatrixval data = Array(Vectors.dense(1,2,3,4,5,6,7,8,9),Vectors.dense(5,6,7,8,9,0,8,6,7),Vectors.dense(9,0,8,7,1,4,3,2,1),Vectors.dense(6,4,2,1,3,4,2,1,5),Vectors.dense(4,5,7,1,4,0,2,1,8))val dataRDD = sc.parallelize(data, 2)val mat: RowMatrix = new RowMatrix(dataRDD)//保留前3个奇异值,需要获得U成员val svd = mat.computeSVD(3, computeU = true)//通过访问svd对象的V、s、U成员分别拿到进行SVD分解后的右奇异矩阵、奇异值向量和左奇异矩阵val U: RowMatrix = svd.U //左奇异矩阵val s: Vector = svd.s //从大到小的奇异值向量[30.88197557931219,10.848035248251415,8.201924156089822]val V: Matrix = svd.V //右奇异矩阵-0.33309047675110115 0.6307611082680837 0.10881297540284612-0.252559026169606 -0.13320654554805747 0.4862541277385016-0.3913180354223819 0.3985110846022322 0.20656596253983592-0.33266751598925126 0.25621153877501424 -0.3575093420454635-0.35120996186827147 -0.24679309180949208 0.16775460006130793-0.1811460330545444 0.03808707142157401 -0.46853660508460787-0.35275045425261 -0.19100365291846758 -0.26646095393100677-0.2938422406906167 -0.30376401501983874 -0.4274842789454556-0.44105410502598985 -0.4108875465911952 0.2825275707788212

同样是这个矩阵data,以下我们用PCA进行分解,看一下效果及与SVD的异同。SVD分解后右奇异矩阵V与PCA降维后的矩阵pc很相似。import org.apache.spark.mllib.linalg.Matriximport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.linalg.distributed.RowMatrixval data = Array(Vectors.dense(1,2,3,4,5,6,7,8,9),Vectors.dense(5,6,7,8,9,0,8,6,7),Vectors.dense(9,0,8,7,1,4,3,2,1),Vectors.dense(6,4,2,1,3,4,2,1,5),Vectors.dense(4,5,7,1,4,0,2,1,8))val dataRDD = sc.parallelize(data, 2)val mat: RowMatrix = new RowMatrix(dataRDD)val pc: Matrix = mat.computePrincipalComponents(3)-0.3948204553820511 -0.3255749878678745 0.10573757539268940.1967741975874508 0.12066915005125914 0.4698636365472036-0.09206257474269655 -0.407047128194367 0.32100955550217590.12315980051885281 -0.6783914405694824 -0.100490655630021310.43871546256175087 -0.12704705411702932 0.2775911848440697-0.05209780173017968 0.10583033338605327 -0.64736976928067370.422474587406277 -0.27600606797384 -0.139091372083387070.46536643478632944 -0.172268807944553 -0.3497316537914160.4376262507870099 0.3469015236606571 0.13076351966313637

使用PCA降维,利用Pyspark的画图功能,可以对新生成的特征的方差贡献度进行可视化,图2-15所示为对hour.csv数据通过PCA处理后,重要特征的排序情况。

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载