Storm技术内幕与大数据实践(txt+pdf+epub+mobi电子书下载)


发布时间:2021-02-28 20:03:19

点击下载

作者:陈敏敏,王新春,黄奉线

出版社:人民邮电出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

Storm技术内幕与大数据实践

Storm技术内幕与大数据实践试读:

前言

本书意在介绍实时大数据的各个方面,分享我们在设计实时应用过程中遇到的一些问题,让一些从零开始构建实时计算平台的公司少走弯路。我们力图使不同背景的读者都能从其中获益。

如果你从事基础架构方面的工作,可以着重阅读以下几章:在第1章中,我们整理了国内主要互联网公司在Storm应用方面的一些情况;在第2章中,我们介绍了实时平台的总体架构,随后引入了大众点评和1号店目前实时平台的一些基本情况;在第4章中,我们给出了源码剖析,为了让不懂Clojure语言的读者也能容易地理解Storm的内部原理,我们配了很多顺序图来描述调用逻辑;在第5章中,我们分享了一些在实践中总结出来的监控Storm应用的常用方法;在第6章中,我们介绍了在 Storm 上如何做一些扩展,方便更好地维护和管理集群;在第 10章中,我们主要分享了Storm的一些小技巧和性能优化的经验。

如果你是大数据产品的开发和架构人员,可以着重阅读后面的几章,其中分享了我们一年来遇到的一些瓶颈。

如果你是算法工程师,可以着重了解第8章和第9章,里面的用户生命周期模型、实时推荐系统的算法和架构、千人千面架构等不少内容来自于我们的生产实践。设计严谨的模型在实时系统上往往会遇到比较大的性能问题,数据量、实时和算法的精准性是相互制约的,提高某一方面,往往不得不牺牲另外两个指标。在实际推荐系统的生产环境中,关联规则和协同过滤的推荐效果往往比较好,被广泛采用,而利用用户画像,结合地域、天气等上下文信息,可以进行一些更加精准的推荐。目前基于用户画像和上下文内容做个性化推荐和搜索、精准化运营和广告营销等提高交易额等转换率,也是很多公司尝试的方向。

对于网上有的或者其他书中介绍过的内容,为适应不同读者的需求,我们会简单提及以做一点点过渡。

尽管我们投入了大量的精力来写这本书,但因为水平所限,书中的内容存在不足和疏漏也在所难免,恳请读者批评指正。如果读者对本书有什么建议,欢迎发送邮件至邮箱xiaochen_0260@qq.com,期待得到真挚的反馈。

第1章 绪论

Apache Storm(http://storm.apache.org/)是由Twitter开源的分布式实时计算系统。Storm可以非常容易并且可靠地处理无限的数据流。对比Hadoop的批处理,Storm是一个实时的、分布式的、具备高容错的计算系统。Storm应用可以使用何编程语言来进行开发,并且非常有趣。

Storm的使用场景非常广泛,比如实时分析、在线机器学习、分布式RPC、ETL等。Storm非常高效,在一个多节点集群上每秒钟可以轻松处理上百万条的消息。Storm还具有良好的可扩展性和容错性以及保证数据可以至少被处理一次等特性。

图1-1中水龙头和后面水管组成的拓扑图就是一个Storm应用(Topology),其中的水龙头是 Spout,用来源源不断地读取消息并发送出去,水管的每一个转接口就是一个 Bolt,通过Stream分组的策略转发消息流。图1-1 Topology图(来源http://storm.apache.org/)

1.1 Storm的基本组件

1.1.1 集群组成

Storm的集群表面上看和Hadoop的集群非常像。但是在Hadoop上运行的是MapReduce的作业(job),而在 Storm上运行的是Topology。Storm和Hadoop一个非常关键的区别是Hadoop的MapReduce作业最终会结束,而Storm的Topology会一直运行(除非显式地杀掉它)。

如果说批处理的 Hadoop 需要一桶桶地搬走水,那么 Storm 就好比自来水水管,只要预先接好水管,然后打开水龙头,水就源源不断地流出来了,即消息就会被实时地处理。

在 Storm 的集群中有两种节点:主节点(Master Node)Nimbus 和工作节点(Worker Node)Supervisor。Nimbus的作用类似于Hadoop中的JobTracker,Nimbus负责在集群中分发代码,分配工作给机器,并且监控状态。每个工作节点上运行一个Supervisor进程(类似于TaskTracker)。Supervisor会监听Nimbus分配给那台机器的工作,根据需要启动/关闭具体的Worker进程。每个Worker进程执行一个具体的Topology,Worker进程中的执行线程称为Executor,可以有一个或者多个。每个Executor中又可以包含一个或者多个Task。Task为Storm中最小的处理单元。一个运行的Topology由运行在一台或者多台工作节点上的Worker进程来完成具体的业务执行。Storm组件和Hadoop组件的对比参见表1-1。表1-1 Storm组件和Hadoop组件对比

Nimbus和Supervisor之间的通信依靠ZooKeeper完成,并且Nimbus进程和Supervisor都是快速失败(fail-fast)和无状态的,所有的状态要么在ZooKeeper里面,要么在本地磁盘上。这也就意味着你可以用kill -9来杀死Nimbus 和Supervisor 进程,然后再重启它们,它们可以继续工作,就好像什么都没有发生过似的。这个设计使得Storm具有非常高的稳定性。Storm的基本体系架构参见图1-2。图1-2 Storm基本体系架构

1.1.2 核心概念

在Storm中有一些核心基本概念,包括Topology、Nimbus、Supervisor、Worker、Executor、Task、Spout、Bolt、Tuple、Stream、Stream分组(grouping)等,如表1-2所示。表1-2 Storm组件基本概念图1-3 Spout工作示意图图1-4 Bolt工作示意图

在Storm中有7种内置的分组方式,也可以通过实现CustomStreamGrouping接口来定义自己的分组。(1)Shuffle分组:Task中的数据随机分配,可以保证同一级Bolt上的每个Task处理的Tuple数量一致,如图1-5所示。图1-5 Shuffle分组随机分配模式(2)Fields 分组:根据 Tuple 中的某一个Filed或者多个Filed的值来划分。比如Stream根据user-id的值来分组,具有相同user-id值的Tuple会被分发到相同的Task中,如图1-6所示。(具有不同user-id值的Tuple可能会被分发到其他Task中。比如 user-id为1的Tuple都会分发给Task1,user-id为2的Tuple可能在Task1上也可能在Task2上,但是同时只能在一个Task上。)图1-6 Fields分组哈希分布模式(3)All分组:所有的Tuple都会到分发到所有的Task上,如图1-7所示。图1-7 All分组全量发送模式(4)Global分组:整个Stream会选择一个Task作为分发的目的地,通常是具有最新ID的Task,如图1-8所示。图1-8 Global 分组单选发送模式(5)None分组:也就是你不关心如何在Task中做Stream的分发,目前等同于Shuffle分组。(6)Direct分组:这是一种特殊的分组方式,也就是产生数据的Spout/Bolt自己明确决定这个Tuple被Bolt的哪些Task所消费。如果使用Direct分组,需要使用OutputCollector的emitDirect方法来实现。(7)Local or shuffle分组:如果目标Bolt中的一个或者多个 Task和当前产生数据的 Task在同一个 Worker进程中,那么就走内部的线程间通信,将Tuple直接发给在当前Worker进程中的目的Task。否则,同Shuffle分组。

1.1.3 Storm的可靠性

Storm 允许用户在 Spout 中发射一个新的 Tuple 时为其指定一个 MessageId,这个MessageId 可以是任意的Object对象。多个Stream Tuple可以共用同一个MessageId,表示这多个Stream Tuple对用户来说是同一个消息单元。Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间内被完全处理。完全处理的意思是该 MessageId绑定的 Stream Tuple 以及由该 Stream Tuple 衍生的所有 Tuple 都经过了 Topology 中每一个应该到达的Bolt的处理。在Storm中,使用Acker来解决Tuple消息处理的可靠性问题。

1.1.4 Storm的特性

总结起来,Storm具有如下优点。

• 易用性:开发非常迅速,容易上手。只要遵守Topology、Spout和Bolt的编程规范即可开发出扩展性极好的应用。对于底层 RPC、Worke r 之间冗余以及数据分流之类的操作,开发者完全不用考虑。

• 容错性:Storm 的守护进程(Nimbus、Supervisor 等)都是无状态的,状态保存在ZooKeeper中,可以随意重启。当Worker失效或机器出现故障时,Storm自动分配新的Worker替换失效的Worker。

• 扩展性:当某一级处理单元速度不够,可以直接配置并发数,即可线性地扩展性能。

• 完整性:采用Acker机制,保证数据不丢失;采用事务机制,保证数据准确性。

由于Storm具有诸多优点,使用的业务领域和场景也越来越广泛。

1.2 其他流式处理框架

1.2.1 Apache S4

Apache S4(http://incubator.apache.org/s4/)是由 Yahoo 开源的多用途、分布式的、可伸缩的、容错的、可插入式的实时数据流计算平台。

S4填补了复杂的专有系统和面向批处理的开源计算平台之间的差距。其目标是开发一个高性能计算平台,对应用程序开发者隐藏并行处理系统固有的复杂性。S4已经在Yahoo!系统中大规模使用,目前最新版本是0.6.0。

S4相对于Storm在可靠性和容错性上差一些,S4不保证完全不丢失数据。在用户活跃度上S4也要差一些。

1.2.2 Spark Streaming

Spark 是 UC Berkeley AMP Lab 开源的类 Hadoop MapReduce 的通用的并行计算框架。Spark 基于 MapReduce 算法实现的分布式计算拥有 Hadoop MapReduce 所具有的优点,但不同于 MapReduce 的是,作业中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

Spark Streaming是建立在Spark上的实时计算框架,通过它提供的API和基于内存的高速执行引擎,用户可以结合流式、批处理和交互式进行查询和实时计算。Spark Streaming的基本的原理是将Stream数据分成小的时间片断(几秒钟到几分钟),以类似batch批量处理的方式来处理这些小部分数据。Spark Streaming 构建在 Spark 上,一方面是因为 Spark的低延迟执行引擎可以用于实时计算,另一方面相比基于 Record 的其他处理框架(如Storm),弹性分布式数据集(Resilient Distributed Datasets,RDD)更容易实现高效的容错处理。此外,小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法,方便了一些需要历史数据和实时数据联合分析的特定应用场合。

Spark Streaming 和Storm两个框架都提供了可扩展性和容错性,它们根本的区别在于它们的处理模型。Storm处理的是每次传入的一个事件,而Spark Streaming是处理某个时间段窗口内的事件流。因此,Storm处理一个事件可以达到极低的延迟,而Spark Streaming的延迟相对较高。

1.2.3 流计算和Storm的应用

大数据的价值在各行各业中得到了广泛使用。针对离线处理,Hadoop已经成为事实上的标准;针对数据实时处理的需求,目前涌现出了许多平台和解决方案。以下汇总了截至2014年流计算和Storm的使用情况。

1.新浪的实时分析平台

新浪实时分析平台的计算引擎是 Storm,整个实时计算平台包括可视化的任务提交Portal界面、对实时计算任务的管理监控平台以及核心处理实时计算平台。

Storm作为核心处理,待处理数据来源为Kafka。对于实时性要求比较高的应用、数据会直接发送到 Kafka,然后由 Storm 中的应用进行实时分析处理;而对实时性要求不太高的应用,则由Scribe收集数据,然后转发到Kafka中,再由Storm进行处理。

任务提交到Portal之前,作业的提交者需要确定数据源、数据的每个处理逻辑,同时确定处理完成后数据的存储、获取和展示方式。在任务提交后,可以完成对任务的管理:编辑、停止、暂停和恢复等。

整个核心处理平台提供了一些通用的模块,如数据的解析(不同的应用有不同的数据格式,可以是简单的分隔符分隔和正则表达式)、对特定字段的TopN计数以及数据的过滤和去重,数据处理过程中使用到了缓存Redis,支持多种存储方式(数据处理完成后可选择的持久化方式有HBase、HDFS、本地文件和MySQL等)。

在应用上,实时分析平台的应用包括HTTP日志分析、PV计算等。

在监控上,通过Storm的Nimbus节点,获取集群的运行数据,结合JMX收集到进程状态信息,将数据发送到统一的监控工具中(如Ganglia)。

2.腾讯的实时计算平台

腾讯的实时计算平台Tencent Real-time Computing主要由两部分组成:分布式K/V 存储引擎TDEngine和支持数据流计算的TDProcess。TDProcess是基于Storm的计算引擎,提供了通用的计算模型,如Sum、Count、PV/UV计算和TopK统计等。整个平台修复了运行中发现的Storm的问题,同时引入YARN进行资源管理。

据称,整个计算平台每天承载了超过1000亿数据量的计算,支持广点通、微信、视频、易迅、秒级监控、电商和互娱等业务上百个实时统计的需求。

3.奇虎360实时平台

奇虎360从2012年开始引入Storm,Storm主要应用场景包括云盘缩略图、日志实时分析、搜索热词推荐、在线验证码识别、实时网络入侵检测等包括网页、图片、安全等应用。在部署中,使用了 CGroup 进行资源隔离,并向 Storm 提交了很多补丁,如 log UI (https://github.com/nathanmarz/storm/pull/598)等。在部署上,Storm 集群复用了其他机器的空闲资源(Storm 部署在其他服务的服务器上,每台机器贡献1~2 核处理器、1~2 GB 内存),整个规模达到60多个集群,15 000多台物理机,服务于170多个业务。每天处理数据量约几百TB、几百亿条记录。

4.京东的实时平台

京东的实时平台基于LinkedIn开源的Samza,整个Samza包括流处理层Kafka,执行层YARN和处理层Samza API。一个流式处理由一个或多个作业组成,作业之间的信息交互借助Kafka实现,一个作业在运行状态表现为一个或者多个Task,整个处理过程实际上是在Task中完成的。在Samza中,Kafka主要的角色是消息的缓冲、作业交互信息的存储,同一个业务流程中使用YARN进行任务调度。在其整个架构中,引入了Redis作为数据处理结果的存储,并通过Comet技术将实时分析的数据推送到前台展示,整个业务主要应用于京东大家电的订单处理,实时分析统计出待定区域中各个状态的订单量(包括待定位、待派工、待拣货、待发货、待配送、待妥投等)。

5.百度的实时系统

相对而言,百度在实时系统上开展的比较早,在其流计算平台DStream开发时业界尚未有类似的开源系统。截至2014年,从公开的资料可以发现,DStream平台的集群规模已超千台,单集群最大处理数据量超过50 TB/天,集群峰值QPS 193W/S,系统稳定性、计算能力已完全满足海量数据时效性处理需求。另一个平台TM则保证数据不重不丢,主要用于报表生成系统、计费流计算等。

6.阿里巴巴团队的JStorm

JStorm(https://github.com/alibaba/jstorm)是阿里巴巴团队基于 Storm 二次开发的, Spout/Bolt等接口的使用方式和Storm保持完全一致,在Storm上开发和运行的代码可以一行不修改就运行在JStorm上。Storm的内核是Clojure编写,JStorm完全用Java重写。JStorm还提供了一些Storm没有的特性。

• Nimbus实现HA:当一台Nimbus宕机,自动热切到备份Nimbus。

• 任务之间影响小:新上线的任务不会冲击老的任务。采用 CGroups 对资源进行硬隔离,保证程序之间CPU不发生抢占。

• 解决 Disruptor 急剧消耗 CPU 问题:当原生 Disruptor 队列慢时,生产方会不断轮询检查Disruptor队列是否有空的Slot,极大消耗队列。

• 调度更强大,彻底解决了 Storm 任务分配不均衡问题。从 CPU、内存、磁盘、网络4个维度进行任务分配。

• Classloader 隔离:解决应用的类和 JStorm 的类发生冲突的问题。将应用的类放置在自己的类空间中。

• 监控更强大:Web UI上展示更多的监控。Task级别,每一个模块消耗时间和队列长度;Worker 级别,每一个模块消耗时间、队列长度、CPU/Memory 使用以及网络时延;还包括用户自定义监控数据。

• 在 JStorm 的介绍中,JStorm 上的应用能够在一行代码都不需要改动的情况下运行在Storm平台上,结合JStorm的其他特性,这将给JStorm带来更广阔的使用选择。

JStrom的开发和更新速度非常快,用户活跃度也很高。更多详细信息可以参考GitHub的介绍。

第2章 实时平台介绍

本章中的实时平台是指针对大数据进行实时分析的一整套系统,包括数据的收集、处理、存储等。一般而言,大数据有 4 个特点:Vol umn(大量)、Velocity(高速)、Variety (多样)、Value (价值),因此针对大数据的实时平台有以下特点。

• 低延迟:高延迟意味着实时性的缺失。

• 分布式:互联网时代,大多数的系统都是部署在一套由多台廉价Linux服务器组成的集群上。

• 高性能:高速产生的大量数据,通过计算分析获取其中的价值,这需要高性能可靠的处理模型。

• 高扩展性:整个系统要有较强的扩展性,数据井喷时能够通过快速部署解决系统的实时需求。而事实上,随着业务的增长,数据量、计算量会呈指数级增长,所以系统的高扩展性是必须的。

• 容错性:整个系统需要有较强的容错性,一个节点宕机不影响业务。

同时,对于应用开发者而言,平台上运行的应用程序容易开发和维护。各处理逻辑的分布、消息的分发以及消息分发的可靠性对于应用开发者是透明的。对于运维而言,平台还需要是可监控的。

结合互联网大数据应用的特点,我们基于Storm构建了实时平台。

2.1 实时平台架构介绍

当网站或者APP到达一定的用户量后,一般需要一套Tracker系统(如图2-1所示),收集用户行为(如用户IP地址、页面来源、城市名、浏览器版本、按钮位置等)、页面访问性能、异常出错等信息,然后根据一定的策略上报到日志服务器。搜索、推荐、广告、选品中心等开发团队分析这些日志,可以调整和开发各种功能;产品经理、高级管理人员等通过这些日志及时优化营运并进行正确决策;运维和应用开发人员根据这些日志进行排错和迭代产品等。Tracker系统在一个成熟的应用中扮演着重要的角色,随着业务的发展,对它的实时性要求也越来越高。图2-1 Tracker系统

Tracker系统一般采用JavaScript语言开发,支持自动打点字段、自动扩展字段等,在网站或者应用的各个页面的事件中嵌入Tracker系统的API,设置一定的策略发送到日志服务器,然后再同步到Kafka等消息队列。对于需要实时日志的应用,一般通过Storm等流式计算框架从消息队列中拉取消息,完成相关的过滤和计算,最后存到HBase、MySQL等数据库中;对于实时性要求不高的应用,消息队列中的日志消息通过Cloudera的Flume系统Sink到HDFS中,然后一般通过ETL、Hive或者批处理的Hadoop作业等抽取到HBase、MySQL等数据库中。如图2-2所示,日志服务器的数据也可以通过Flume系统Sink到Kafka等消息队列中,供Storm实时处理消息。图2-2 Flume的过程

2.2 Kafka架构

在Kafka的官方介绍中,Kafka定义为一个设计独特的消息系统。相比于一般的消息队列,Kafka提供了一些独特的特性,非常高的吞吐能力,以及强大的扩展性。本小节将简单介绍Kafka。

2.2.1 Kafka的基本术语和概念

Kafka中有以下一些概念。

• Broker:任何正在运行中的Kafka示例都称为Broker。

• Topic:Topic其实就是一个传统意义上的消息队列。

• Partition:即分区。一个 Topic将由多个分区组成,每个分区将存在独立的持久化文件,任何一个Consumer在分区上的消费一定是顺序的;当一个Consumer 同时在多个分区上消费时,Kafka不能保证总体上的强顺序性(对于强顺序性的一个实现是Exclusive Consumer,即独占消费,一个队列同时只能被一个Consumer消费,并且从该消费开始消费某个消息到其确认才算消费完成,在此期间任何Consumer不能再消费)。

• Producer:消息的生产者。

• Consumer:消息的消费者。

• Consumer Group:即消费组。一个消费组是由一个或者多个Consumer组成的,对于同一个 Topic,不同的消费组都将能消费到全量的消息,而同一个消费组中的Consumer将竞争每个消息(在多个Consumer消费同一个Topic时,Topic的任何一个分区将同时只能被一个Consumer消费)。

如图2-3所示,在Kafka中,消息将被生产者“推”(push)到Kafka中,Consumer会不停地轮询从Kafka中“拉”(pull)数据。图2-3 Kafka中消息的读写过程

2.2.2 Kafka在实时平台中的应用

在工作环境中,流式计算平台架构如图2-4所示。图2-4 流式计算平台架构

用户访问会源源不断地产生数据,数据要么存储在本地并在需要时发送到相关的应用,要么存储到一个统一的中央存储区中。产生的数据会被Storm中的Spout抓取、过滤并进行相关处理(例如应用之间协议解析、格式分析、数据校验等),然后发送到 Bolt 中进行数据分析,最终形成可用数据并存储到持久化介质(如DB)中,供其他应用获取。

数据暂存区的意义在于,首先数据是随着用户的访问而产生的,一般的平台在数据产生后要向其他分析程序“推”数据,而Storm是主动抓取数据并进行分析处理,是“拉”;其次即便在Storm中实现一个能够接受“推”的模型(如在Spout中增加内存队列等),当数据源突然增加时有可能导致Storm上应用并发度不足而引起其他状况,此时相当于对Storm发起一次DoS攻击。因此,去掉数据暂存区对Storm的维护、整个平台的运维而言都不是非常好的选择。

很多大数据实时平台的数据暂存区选用了Kafka,是基于Kafka的以下优点。

• 高性能:每秒钟能处理数以千计生产者生产的消息,详尽的数据请参考官网的压力测试结果。

• 高扩展性:当Kafka容量不够时可以通过简单增加服务器横向扩展Kafka集群的容量。

• 分布式:消息来自数以千计的服务,数据量比较巨大,单机显然不能处理这个量级的数据,为解决容量不足、性能不够等状况,分布式是必需的。

• 持久性:Kafka会将数据持久化到硬盘上,以防止数据的丢失。

• Kafka相对比较活跃,而且在Storm0.9.2中,Kafka已经是StormSpout中的可选Spout。

本节将简单描述Kafka,关于其更详尽的信息请直接参考Kafka官方文档:http://Kafka. apache.org/documentation.html。

Kafka是由LinkedIn开源的高效的持久化的日志型消息队列,利用磁盘高效的顺序读写特性使得在很多场景下,瓶颈甚至不在于磁盘读写而在于网络的传输上。与Amazon的Dynamo引领了一批NoSQL类似,Kafka的设计哲学很值得借鉴,在国内很多公司内部的消息队列中均能够看到Kafka的身影,如makfa、metaq、equeue等。以下将简单介绍Kafka,关于Kafka更多的内容详见附录A或者请查阅官方文档。

2.2.3 消息的持久化和顺序读写

Kafka没有使用内存作为缓存,而是直接将数据顺序地持久化到硬盘上(事实上数据是以块的方式持久化的),同时Kafka 中的每个队列可以包含多个区并分别持久化到不同的文件中。关于顺序读写的分析,在Kafka的官方介绍中有这样的描述:“在一个6×7200rpm SATARAID-5的磁盘阵列上线性写的速度大概是300MB/s,但是随机写的速度只有50KB/s。”

2.2.4 sendfile系统调用和零复制

在数据发送端,Kafka使用sendfile调用减少了数据从硬盘读取到发送之间内核态和用户态之间的数据复制。

传统上,当用户需要读取磁盘上的数据并发送到客户端时,会经历这样的步骤:打开文件磁盘上的文件准备读取,创建远端套接字(socket)的连接,循环从磁盘上读取数据,将读取到的数据发送出去,发送完成后关闭文件和远端连接。仔细分析其中的步骤,我们会发现,在这个过程中,一份数据的发送需要多次复制。首先,通过read调用每次从磁盘上读取文件,数据会被从磁盘上复制到内核空间,然后再被复制到读取进程所在的用户空间。其次,通过write调用将数据从进程所在的用户空间发送出去时,数据会被从用户空间复制到内核空间,再被复制到对应的网卡缓冲区,最终发送出去。期间数据经历了多次复制以及在用户态和内核态之间的多次转换,每一次都将产生一个非常昂贵的上下文切换,当有大量的数据仅仅需要从文件读出并被发送时代价会非常大。

sendfile 系统调用优化了上述流程:数据将首先从磁盘复制到内核空间,再从内核空间复制到发送缓冲区,最终被发送出去。在 Linux 系统中,sendfile可以支持将数据发送到文件、网络设备(网卡)或者其他设备上。sendfile是Kernel 2.2提供的新特性(从glibc 2.1开始提供头文件)。

图2-5中简单对比了使用一般read/write和使用sendfile将数据从硬盘中读出并发送的过程。图2-5 read/write和sendfile系统调用对比

通过分析,我们可以发现,通过简单的read/write读取并发送数据,需要4次系统调用以及4次数据复制;而使用sendfile只产生2次系统调用及数据复制。由于每一次空间切换内核将产生中断、保护现场(堆栈、寄存器的值需要保护以备执行完成后切换回来)等动作,每一次数据复制消耗大量CPU。sendfile对这两个优化带来的变化是数据发送吞吐量提高,同时减少了对CPU资源的消耗。当存在大量需要从硬盘上发送的数据时,其优势将非常明显。也正因此,很多涉及文件下载、发送的服务都支持直接sendfile调用,如Apache httpd、Nginx、Lighttpd 等。

2.2.5 Kafka的客户端

Kafka目前支持的客户端有C/C++、Java、.NET、Python、Ruby、Perl、Clojure、Erlang、Scala等,甚至还提供了HTTP REST 的访问接口。

在消息生产端,可以预定义消息的投放规则,如某些消息该向哪个Partition发送(如可以按照消息中的某个字段,如用户字段,进行哈希,使得所有该用户的消息都发送到同一个Partition上)。

在消息的消费端,客户端会将消息消费的偏移量记录到ZooKeeper中。如果需要事务性的支持,可以将偏移量的存储放在事务中进行:除非消息被消费并被处理完成,否则事务的回滚将满足再次消费的目的。

2.2.6 Kafka的扩展

Kafka依赖于ZooKeeper,集群的扩容非常方便,直接启动一个新的节点即可。对于已经存在的消息队列,Kafka 提供了相关的工具(kafka-reassign-partitions.sh)将数据迁移到新节点上。在0.8.1版本中,该工具尚不能在保证迁移的同时保证负载均衡。

2.3 大众点评实时平台

大众点评网于2003年4月成立于上海。大众点评网是中国领先的城市生活消费平台,也是全球最早建立的独立第三方消费点评网站。大众点评不仅为用户提供商户信息、消费点评及消费优惠等信息服务,同时亦提供团购、餐厅预订、外卖及电子会员卡等O2O(Online to Offline)交易服务。大众点评网是国内最早开发本地生活移动应用的企业,目前已经成长为一家领先的移动互联网公司,大众点评移动客户端已成为本地生活必备工具。

2.3.1 相关数据

截止到2014年第三季度,大众点评网月活跃用户数超过1.7亿,点评数量超过 4200万条,收录商户数量超过1000万家,覆盖全国2300多个城市及美国、日本、法国、澳大利亚、韩国、新加坡、泰国、越南、马来西亚、印度尼西亚、柬埔寨、马尔代夫、毛里求斯等近百个热门旅游国家和地区。

截止到2014年第三季度,大众点评月综合浏览量(网站及移动设备)超过80亿,其中移动客户端的浏览量超过80%,移动客户端累计独立用户数超过1.8亿。

目前,除上海总部之外,大众点评已经在北京、广州、天津、杭州、南京等130座城市设立分支机构。

2.3.2 实时平台简介

目前大众点评的实时数据平台经过一段时间的搭建已经基本成型。平台包括了一系列的工具和系统,大部分系统是在原有系统的基础上适当增加功能来完成。主要部分包括了日志打点和收集系统、数据传输和计算平台、持久化数据服务以及在线数据服务等部分。(1)日志的传输和收集,主要依赖Blackhole和Puma来完成。Blackhole是一个大众点评自己开发的类似于Kafka的分布式消息系统,收集了除MySQL日志以外的所有数据源的日志,并以流的形式提供了批量和实时两种数据消费方式。2.3.3节将具体介绍Blackhole。Puma是以MySQL从节点(slave node)的方式运行,接收MySQL的binlog,解析binlog,然后以MQ的形式提供数据服务。

日志打点和收集系统包括了以下几个日志数据源。

• 浏览器自助打点服务,供产品经理和运营人员,数据分析人员在页面上配置打点;配置完成后,系统自动将需要打点的地方推送到前端网页上,用户浏览网页时候的点击行为以及鼠标悬停等就会触发相应的日志数据,实时传回后端的日志服务器。

• 在大众点评的 3 个主要 APP(大众点评、大众点评团和周边快查)的框架中内含了所有页面的按钮、页面滑动以及页面切换等的埋点,只要用户有相应的操作,就会记录日志,批量发送到日志服务器。

• 此外,同其他平台的合作(如微信、QQ空间等)也有相应的埋点,记录对应的日志。

以上所有的用户浏览日志数据加上后端应用的日志、Nginx 日志和数据库的增删改日志等,一并通过日志收集系统实时地传输到日志的消费方(主要是Storm中的Topology)。其他的数据源还包括MQ系统,由应用在执行过程中产生。(2)Storm是实时平台的核心组成部分,目前在Storm上运行了几十个业务Topology,日处理数据量在百亿级,峰值的数据TPS在10万左右。随着大众点评业务的发展,数据处理量仍在快速增加。(3)Topology中Bolt计算的结果数据和中间交换数据根据业务需求存放在Redis、HBase或者MySQL中。(4)数据持久化到相应的数据库中后,由RPC服务器提供对外统一的访问服务,用户不用关心数据存储的细节、位置和容错,直接获取数据。

整个平台的系统架构如图2-6所示。图2-6 大众点评实时平台系统架构

2.3.3 Blackhole

Blackhole是类似于Kafka的一个流式系统,是大众点评的数据收集和订阅消费的平台。数据仓库的所有日志数据都是由Blackhole来完成收集并存入HDFS中的。Blackhole每天收集超过2 TB的日志数据。Blackhole的Agent 同其他平台工具一起部署在所有的几千台线上机器中,批量日志收集保证数据无丢失,实时数据保证高实效性和高性能。

Blackhole具有良好的水平扩展性和容错能力。内部基于行为(actor-based)的分布式系统实现系统的高性能;采用 Kafka 类似的提交日志(commit log)保证数据完整性。在Blackhole中,分为4类角色,即Supervisor、Broker、Agent和Consumer。

• Supervisor:Supervisor是管理者,负责所有的调度以及元数据管理。Agent、Broker和Consumer都和Supervisor维持了心跳信息,如果某个Broker失败了,Supervisor会让这个Broker连接的Agent和Consumer转移到其他Broker节点上。进行相应的动态扩容以后,Supervisor会发起rebalance操作,保持负载均衡。

• Broker:Broker是数据的管理者。Agent向Broker上报数据,Broker会在本地磁盘缓存数据,用于可靠性保障。Consumer向Broker发送数据所在文件位置的偏移量,获取对应具体的数据。同一个数据源的数据会发送到多个Broker中以达到负载均衡的效果。同时Broker会批量地将日志文件上传到HDFS中,用于后续的作业和各种数据分析。

• Agent:Agent监听相应的日志文件,是数据的生产者,它将日志发送到Broker。

• Consumer:Consumer实时地从Broker中获取日志数据。通常将Storm的Spout作为具体的Consumer来消费数据。

Blackhole体系架构如图2-7所示。图2-7 Blackhole体系架构

2.4 1号店实时平台

1号店于2008年7月成立于上海,开创了中国电子商务行业“网上超市”的先河。至2013年年底,覆盖了食品饮料、生鲜、进口食品、美容护理、服饰鞋靴、厨卫清洁用品、母婴用品、数码手机、家居用品、家电、保健器械、电脑办公、箱包珠宝手表、运动户外、礼品等14个品类。1号店是中国第一家自营生鲜的综合性电商;在食品饮料尤其进口食品方面,牢牢占据中国B2C电商行业第一的市场份额;进口牛奶的销量占到全国海关进口总额的37.2%;在洗护发、沐浴、女性护理、口腔护理产品等细分品类保持了中国B2C电商行业第一的市场份额;手机在线销售的市场份额跻身中国B2C电商行业前三名。

1号店拥有9 000万注册用户,800多万的SKU,2013年实现了115.4亿元的销售业绩,数据平台处理3亿多的独立用户ID(未登录用户和登录用户),100T的数据量。

2013年规划1号店实时平台时,主要的应用为个性化推荐、反爬虫、反欺诈分析、商铺订单、流量实时分析和BI实时报表统计。平台搭建之初,已上线的应用中每天需要实时分析的数据量峰值在 450 GB 左右,秒级别延迟。基于 Storm 的流计算也同样适用于搜索实时索引、移动端流量分析、广告曝光数据分析、风险控制和移动端访问数据分析等应用场景。

和所有互联网公司大数据分析服务一样,1号店的数据服务包括数据的采集、收集、分析、持久化、应用引擎、推送和展示等。数据的收集主要来自基于 JavaScript 的内部实现的服务(如Tracker、基于开源的HAProxy的日志等),数据收集后,部分要求准实时的服务会暂时持久化到硬盘上,后通过Flume(这里使用的Flume是Flume-ng版本1.4,以下不再赘述)、syslogd等推送到Kafka中,Storm上的实时应用实时获取Kafka中的数据进行分析,并将结果持久化供相关业务使用和展示。

1号店实时处理平台架构如图2-8所示。图2-8 1号店实时处理平台架构

整个平台用于处理用户访问产生的数据,包括行为数据、HA Log、广告曝光数据和流量数据等,数据会在产生的第一时间被收集并发送到日志转发服务(如Scribe、Flume)上,然后由日志转发服务将其推到Kafka对应的Topic中。如果需要通过Hadoop计算全量,也会推送到HDFS中。运行在Storm中的应用会读取Kafka中的数据进行分析,并将分析结果持久化到持久化层中。推送引擎主动获取持久化层中的数据,将处理结果推送到对应的业务系统并最终展示给用户。在整个平台中,使用Flume作为数据推送组件是基于以下几点考虑。

• Flume能够接收多种数据源,包括获取控制台输出、tail、syslogd、exec 等,支持TCP和UDP协议。

• Flume支持基于内存、文件等通道,数据在转发到相关服务之前暂时存放于通道内。

• Flume支持多种数据推送,如将数据推送到HDFS、MySQL、HBase、MongoDB中。

• Flume有着非常优雅的实现,通过编写相应的plugin,能够轻易扩展支持其他类型的数据源和推送。

• Flume具有高性能。

使用Kafka作为数据的缓存主要是基于以下几点考虑。

• 某些数据会被多种业务使用,如访问日志,既用于反爬虫分析也用于反欺诈、反注入分析,一个同样的数据会被消费多次,而Kafka能够满足该需求。

• 从实时平台而言,Storm中Spout 的消息消费类型属于“拉”模式,而数据产生服务属于“推”模式(有访问就有数据),中间需要同时支持“推”和“拉”的消息平台。

• Kafka 在单台6块硬盘的服务器上实测峰值能够达到600 Mbit/s,数据的产生和消费是准实时的,性能上是可以接受的。

• 对于互联网应用而言,数据的高峰可能是间歇性、井喷性的,如“大促”、“周年庆”、“双11”等时段的流量可能是平时的5倍甚至10倍。从就成本而言,与其维护一个容量为平时流量10倍的集群倒不如维护一个容量为平时2~3倍容量而数据井喷时允许一定的延迟的集群更划算些。

第3章 Storm集群部署和配置

本章中主要介绍了Storm的部署过程以及相关的配置信息。通过本章内容,帮助读者从零开始搭建一个Storm集群。相关的过程和主要的配置选项是Storm的运维人员需要重点关注的,对部署和配置选项不感兴趣的读者,可以跳过本章。

在开始 Storm之旅前,我们先看一下Storm部署和配置的相关信息,并提交一个Topology,了解Storm的基本原理。Storm的部署模式包括单机和集群环境,同时在向Storm环境中提交Topology时,可以提交为本地(LocalCluster)或集群模式。Storm上应用的第一编程语言是Java,通过Storm的本地集群模式,Topology可以在Eclipse中直接运行、调试,因此关于Storm的部署这里只涉及本地和集群模式。

在本地模式中,Storm会在进程中模拟Storm集群的功能,编写的Topology代码无需提交可以直接在本地运行,这对于开发和测试Topology,非常有好处。

3.1 Storm的依赖组件

要部署Storm,需要部署以下几个相关组件。

• JDK:可以到 Oracle 官网下载并部署,设置环境变量(JAVA_HOME、PATH 和CLASSPATH)并使之生效;JDK 部署完成后通过 java -version 命令可以查看到对应的JDK版本,如图3-1所示。图3-1 JDK安装测试

• ZooKeeper:Storm本身重度依赖于ZooKeeper,同时在我们线上的环境中还有其他依赖于ZooKeeper的服务,因此单独部署一个专门用于流式计算的ZooKeeper是非常有必要的。

• Storm:Storm可以从其官方地址http://storm.apache.org/下载。

若部署0.9版本之前的Storm,还需要安装ZMQ和JZMQ(除非作为研究之用,否则不推荐)。Storm在0.9之前使用的消息传输机制是ZMQ,从0.9开始引入Netty(也还支持ZMQ)。相比ZMQ的C实现,纯Java实现的Netty能够提供更好的性能和可管理性(ZMQ不能通过-Xmx等对内存进行管理)。

在Yahoo!中运行着一个超过250个节点的Storm集群,雅虎改进了Storm对ZooKeeper的依赖,使得0.9.2版本时一个ZooKeeper集群已经能够支持2000个节点,而他们的目标是到2015年一个Storm集群支持超过4000个节点。

3.2 Storm的部署环境

Storm集群分为Nimbus节点和Supervisor节点。

• Nimbus节点:用于提交应用Topology、管理整个Storm节点(将Topology的Task分配给Worker、监控各个Supervisor节点的状态进行负载均衡等)。Nimbus节点上不能运行Worker。

• Supervisor节点:负责从ZooKeeper上获取、启动并运行任务。

因此相对而言,我们认为Nimbus并不需要Supervisor节点那么高的配置,在我们的测试环境中,Nimbus的硬件配置只有Supervisor节点的一半。Storm UI节点也不需要高配置,可以和Nimbus节点在同一台机器上。

3.3 部署Storm服务

以下简要介绍一下Storm的部署。

3.3.1 部署ZooKeeper

在我们编写本书时,ZooKeeper最新稳定版本已经是3.4.6,鉴于我们的环境上运行的是3.4.5且该版本在生产环境中已经稳定运行较长时间,因此本书是基于3.4.5版本(在部署上3.4.5版本和3.4.6版本并没有任何的不同之处)。ZooKeeper的部署可以参考ZooKeeper官网的安装手册:http://ZooKeeper.apache.org/doc/trunk/ZooKeeperStarted.html。ZooKeeper的安装文件可以从http://www.apache.org/dyn/closer.cgi/ZooKeeper/下载。直接下载编译好的版本,解压,修改相应的配置即可,在此不再赘述。

从版本3.4.0开始,ZooKeeper提供了自动清理快照(snapshot)和事务日志的功能,需要在zoo.cfg配置文件中设置。

autopurge.purgeInterval=1

autopurge.snapRetainCount=3

• autopurge.purgeInterval:这个参数指定了持久化日志清理频率,单位是小时,默认是0,表示不开启自动清理功能。

• autopurge.snapRetainCount:这个参数和上面的参数搭配使用,用于指定需要保留的持久化日志文件数目,默认是保留3个。

值得注意的是,ZooKeeper推荐部署奇数台服务器(根据ZooKeeper的特性,2N+1台的ZooKeeper集群,当N个节点不能访问时,整个ZooKeeper仍然是可用的)。

3.3.2 部署Storm

在Storm官网上(http://storm.apache.org/downloads.html)可以获取到Storm的最新和最近几个版本。

在编写本书时,Storm最新稳定版本已经是0.9.3,鉴于在我们的环境中使用的是Storm 0.9.0.1 版本且该版本经过一些参数调整后已经稳定运行,本节中使用的仍旧是0.9.0.1。

Storm可以下载编译好的版本并在下载完成后将其放入安装路径中。我们习惯于新建一个路径用于安装所有流计算相关的组件(如 Flume、Kafka、ZooKeeper、Storm等),例如:

/home/storm

将Storm安装文件移动到安装路径下:

mv storm-0.9.0.1.tar.gz /home/storm/

解压安装包:

tar zxvf storm-0.9.0.1.tar.gz

3.3.3 配置Storm

Storm的配置文件为storm-0.9.0.1/conf/storm.yaml。在运行Storm进程之前,需要对该配置文件进行基本配置。表3-1列出了Storm中部分比较重要的配置信息。表3-1 Storm的配置项续表

需要注意的是,Storm的配置文件为yaml文件,配置项后面必须跟一个空格才能跟配置值。

除了conf/storm.yaml配置文件之外,还有两个需要注意的配置。(1)logback/cluster.xml文件,其中可以配置Storm的日志级别矩阵信息等。(2)操作系统的配置(通过ulimit -a查看),其中有两项信息需要配置。

• open files:当前用户可以打开的文件描述符数。

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载