Storm源码分析(txt+pdf+epub+mobi电子书下载)


发布时间:2020-07-20 23:57:56

点击下载

作者:李明,王晓鹏

出版社:人民邮电出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

Storm源码分析

Storm源码分析试读:

前言

Storm是一个分布式的、可靠的实时计算系统。与Hadoop的批处理不同,Storm采用流式的消息处理方法,它使得消息可以得到快速的处理,可以用于实时性要求较高的系统,例如广告点击的在线统计等。Storm弥补了Hadoop在实时处理方面的缺陷,目前被各大互联网公司广泛使用并日益流行。

本书作为第一本深入介绍Storm的图书,从源代码的角度详细剖析了Storm的设计与实现。本书适合各类型的计算机工作者,初学者可以通过本书来学习如何实现一个可靠的、高容错性的、实时的分布式处理平台。而对于Storm用户来讲,本书不仅可以帮助他们更深入地了解这套系统的工作原理,还可以帮助他们正确地使用该平台,也有利于实现对Storm的二次开发。鉴于Storm是基于Clojure和Java开发的,所以需要读者对这两种语言有一定的了解。

本书主要分析阐述了Storm的底层架构,例如Nimbus、Supervisor、Worker、Executor以及Task,并对Storm如何实现可靠的消息传输进行了系统讨论,例如事务Topology以及Trident。

本书对Storm的最新源代码进行了系统而详尽的分析,相信读者在阅读过程中一定会获益匪浅。

致谢

诚挚感谢人民邮电出版社和图灵公司为我提供创作的平台。感谢本书的编辑王军花、张霞等,你们的专业态度和细致工作提升了本书的品质。

诚挚感谢妻子包黎云,没有她的支持,我无法完成本书。最后,将该书献给我即将出世的孩子。——李明

诚挚感谢同事贺军、王明雨以及童杰,感谢你们在这本书的编写过程中给予我们的无私帮助。特别感谢我的妻子白鸽,感谢你对我的理解、包容和支持!——晓鹏第1章总体架构与代码结构

Storm是由BackType开发的一个实时、分布式的计算平台,后来Twitter收购了BackType并将其源代码开放。在GitHub上(https://github.com/nathanmarz/storm),我们可以获得Storm的最新源代码及相关文档。1.1Storm的总体结构

Storm中会涉及的术语包括Stream、Spout、Bolt、Worker、Executor、Task、Stream Grouping和Topology,现简要介绍如下。

●Stream是被处理的数据。

●Spout是数据源。

●Bolt封装了数据处理逻辑。

●Worker是工作进程。一个工作进程中可以含有一个或多个Executor线程。

●Executor是运行Spout或Bolt处理逻辑的线程。

●Task是Storm中的最小处理单元。一个Executor中可以包含一个或多个Task,消息的分发都是从一个Task到另一个Task进行的。

●Stream Grouping定义了消息分发策略,定义了Bolt节点以何种方式接收数据。消息可以随机分配(Shuffle Grouping,随机分组),或者根据字段值分配(Fields Grouping,字段分组),或者广播(All Grouping,全部分组),或者总是发给同一个Task(Global Grouping,全局分组),也可以不关心数据是如何分组的(None Grouping,无分组),或者由自定义逻辑来决定,即由消息发送者决定应该由消息接收者组件的哪个Task来处理该消息(Direct Grouping,直接分组)。

●Topology是由消息分组方式连接起来的Spout和Bolt节点网络,它定义了运算处理的拓扑结构,处理的是不断流动的消息。除非杀掉Topology,否则它将永远运行下去。

Storm的基本结构如图1-1所示。

Storm集群中存在两种类型的节点:运行Nimbus服务的主节点和运行Supervisor服务的工作节点。Storm集群由一个主节点和多个工作节点组成。主节点上运行一个名为“Nimbus”的守护进程,用于分配代码、布置任务及检测故障。每个工作节点则运行一个名为“Supervisor”的守护进程,用于监听工作、开始并终止工作进程。图1-1 Storm的基本结构

Nimbus和Supervisor都能快速失败并恢复,而且它们是无状态的,其元数据存储在ZooKeeper中,这使得系统具有很高的容错性。Nimbus与Supervisor之间的协调工作是通过ZooKeeper来完成的,它是Apache下面的开源项目,用于分布式系统的同步等,详情可参考http://zookeeper.apacheorg/。

Worker由Supervisor负责启动,一个Worker中可以有多个Executor线程,每个Executor中又可包含一个或多个Task。Task为Storm中的最小处理单元,它是Topology组件诸多并行度中的一个。每个Executor都会启动一个消息循环线程,用以接收、处理和发送消息。当Executor收到属于其下某一Task的消息后,就会调用该Task对应的处理逻辑对消息进行处理。

在逻辑上,Storm中消息的来源节点被称为Spout,消息的处理节点被称为Bolt。在系统中,可以存在多个Spout及Bolt,且每个Spout或Bolt都可设置不同的并行度,示例如图1-2所示。图1-2 示例图1.2Storm的元数据

Storm采用ZooKeeper来存储Nimbus、Supervisor、Worker以及Executor之间共享的元数据,这些模块在重启之后,可以通过对应的元数据进行恢复。因此Storm的模块是无状态的,这是保证其可靠性及可扩展性的基础。了解元数据以及Storm如何使用这些元数据,有助于我们更好地理解Storm的设计。1.2.1 元数据介绍

Storm在ZooKeeper中存储数据的目录结构如图1-3所示,这是一个根路径为/storm的树,树中的每一个节点代表ZooKeeper中的一个节点(znode),每一个叶子节点是Storm真正存储数据的地方。在图1-3中,从根节点到叶子节点的全路径代表了该数据在ZooKeeper中的存储路径,该路径可被用来写入或获取数据。图1-3 Storm在ZooKeeper中存储的数据

下面分别介绍ZooKeeper中每项数据的具体含义。

●/storm/workerbeats//node-port:它存储由node和port指定的Worker的运行状态和一些统计信息,主要包括storm-id(也即topology-id)、当前Worker上所有Executor的统计信息(如发送的消息数目、接收的消息数目等)、当前Worker的启动时间以及最后一次更新这些信息的时间。在一个topology-id下面,可能有多个node-port节点。它的内容在运行过程中会被更新。

●/storm/storms/:它存储Topology本身的信息,包括它的名字、启动时间、运行状态、要使用的Worker数目以及每个组件的并行度设置。它的内容在运行过程中是不变的。

●/storm/assignments/:它存储了Nimbus为每个Topology分配的任务信息,包括该Topology在Nimbus机器本地的存储目录、被分配到的Supervisor机器到主机名的映射关系、每个Executor运行在哪个Worker上以及每个Executor的启动时间。该节点的数据在运行过程中会被更新。

●/storm/supervisors/:它存储Supervisor机器本身的运行统计信息,主要包括最近一次更新时间、主机名、supervisor-id、已经使用的端口列表、所有的端口列表以及运行时间。该节点的数据在运行过程中也会被更新。

●/storm/errors///e:它存储运行过程中每个组件上发生的错误信息。是一个递增的序列号,每一个组件最多只会保留最近的10条错误信息。它的内容在运行过程中是不变的(但是有可能被删除)。1.2.2 Storm怎么使用这些元数据

了解了存储在ZooKeeper中的数据,我们自然想知道Storm是如何使用这些元数据的。例如,这些数据何时被写入、更新或删除,这些数据都是由哪种类型的节点(Nimbus、Supervisor、Worker或者Executor)来维护的。接下来,我们就简单介绍一下这些关系,希望读者能对Storm的整体设计实现有更深一层的认识。带上这些知识,能让你的Storm源码之路变得更加轻松愉快。

首先来看一下总体交互图,如图1-4所示。图1-4 总体交互图

这个图描述了Storm中每个节点跟ZooKeeper内元数据之间的读写依赖关系,详细介绍如下。

1.Nimbus

Nimbus既需要在ZooKeeper中创建元数据,也需要从ZooKeeper中获取元数据。下面简述图1-4中箭头1和箭头2的作用。

●箭头1表示由Nimbus创建的路径,包括:

 a./storm/workerbeats/

 b./storm/storms/

 c./storm/assignments/

其中对于路径a,Nimbus只会创建路径,不会设置数据(数据是由Worker设置的,后面会介绍);对于路径b和c,Nimbus在创建它们的时候就会设置数据。a和b只有在提交新Topology的时候才会创建,且b中的数据设置好后就不再变化,c则在第一次为该Topology进行任务分配的时候创建,若任务分配计划有变,Nimbus就会更新它的内容。

●箭头2表示Nimbus需要获取数据的路径,包括:

 a./storm/workerbeats//node-port

 b./storm/supervisors/

 c./storm/errors///e

Nimbus需要从路径a读取当前已被分配的Worker的运行状态。根据该信息,Nimbus可以得知哪些Worker状态正常,哪些需要被重新调度,同时还会获取到该Worker所有Executor统计信息,这些信息会通过UI呈现给用户。从路径b可以获取当前集群中所有Supervisor的状态,通过这些信息可以得知哪些Supervisor上还有空闲的资源可用,哪些Supervisor则已经不再活跃,需要将分配到它的任务分配到其他节点上。从路径c上可以获取当前所有的错误信息并通过UI呈现给用户。集群中可以动态增减机器,机器的增减会引起ZooKeeper中元数据的变化,Nimbus通过不断获取这些元数据信息来调整任务分配,故Storm具有良好的可扩展性。当Nimbus死掉时,其他节点是可以继续工作的,但是不能提交新的Topology,也不能重新进行任务分配和负载调整,因此目前Nimbus还是存在单点的问题。

2.Supervisor

同Nimbus类似,Superviser也要通过ZooKeeper来创建和获取元数据。除此之外,Supervisor还通过监控指定的本地文件来检测由它启动的所有Worker的运行状态。下面简述图1-4中箭头3、箭头4和箭头9的作用。

●箭头3表示Supervisor在ZooKeeper中创建的路径是/storm/supervisors/。新节点加入时,会在该路径下创建一个节点。值得注意的是,该节点是一个临时节点(创建ZooKeeper节点的一种模式),即只要Supervisor与ZooKeeper的连接稳定存在,该节点就一直存在;一旦连接断开,该节点则会被自动删除。该目录下的节点列表代表了目前活跃的机器。这保证了Nimbus能及时得知当前集群中机器的状态,这是Nimbus可以进行任务分配的基础,也是Storm具有容错性以及可扩展性的基础。

●箭头4表示Supervisor需要获取数据的路径是/storm/assignments/。我们知道它是Nimbus写入的对Topology的任务分配信息,Supervisor从该路径可以获取到Nimbus分配给它的所有任务。Supervisor在本地保存上次的分配信息,对比这两部分信息可以得知分配信息是否有变化。若发生变化,则需要关闭被移除任务所对应的Worker,并启动新的Worker执行新分配的任务。Nimbus会尽量保持任务分配的稳定性,我们将在第7章中进行详细分析。

●箭头9表示Supervisor会从LocalState(相关内容会在第4章中介绍)中获取由它启动的所有Worker的心跳信息。Supervisor会每隔一段时间检查一次这些心跳信息,如果发现某个Worker在这段时间内没有更新心跳信息,表明该Worker当前的运行状态出了问题。这时Supervisor就会杀掉这个Worker,原本分配给这个Worker的任务也会被Nimbus重新分配。

3.Worker

Worker也需要利用ZooKeeper来创建和获取元数据,同时它还需要利用本地的文件来记录自己的心跳信息。

下面简述图4-1中箭头5、箭头6和箭头8的作用。

●箭头5表示Worker在ZooKeeper中创建的路径是/storm/workerbeats//node- port。在Worker启动时,将创建一个与其对应的节点,相当于对自身进行注册。需要注意的是,Nimbus在Topology被提交时只会创建路径/storm/workerbeats/,而不会设置数据,数据则留到Worker启动之后由Worker创建。这样安排的目的之一是为了避免多个Worker同时创建路径时所导致的冲突。

●箭头6表示Worker需要获取数据的路径是/storm/assignments/,Worker会从这些任务分配信息中取出分配给它的任务并执行。

●箭头8表示Worker在LocalState中保存心跳信息。LocalState实际上将这些信息保存在本地文件中,Worker用这些信息跟Supervisor保持心跳,每隔几秒钟需要更新一次心跳信息。Worker与Supervisor属于不同的进程,因而Storm采用本地文件的方式来传递心跳。

4.Executor

Executor只会利用ZooKeeper来记录自己的运行错误信息,下面简述图4-1中箭头7的作用。

箭头7表示Executor在ZooKeeper中创建的路径是/storm/errors///e。每个Executor会在运行过程中记录发生的错误。

5.小结

从前面的描述中可以得知,Nimbus、Supervisor以及Worker两两之间都需要维持心跳信息,它们的心跳关系如下。

●Nimbus和Supervisor之间通过/storm/supervisors/路径对应的数据进行心跳保持。Supervisor创建这个路径时采用的是临时节点模式,所以只要Supervisor死掉,对应路径的数据就会被删掉,Nimbus就会将原本分配给该Supervisor的任务重新分配。

●Worker跟Nimbus之间通过/storm/workerbeats//node-port中的数据进行心跳保持。Nimbus会每隔一定时间获取该路径下的数据,同时Nimbus还会在它的内存中保存上一次的信息。如果发现某个Worker的心跳信息有一段时间没更新,就认为该Worker已经死掉了,Nimbus会对任务进行重新分配,将分配至该Worker的任务分配给其他Worker。

●Worker跟Supervisor之间通过本地文件(基于LocalState)进行心跳保持。1.3Storm的代码结构

在本书中,我们主要分析Storm 0.9.0的源代码,其下载地址为https://github.com/nathanmarz/storm/tree/0.9.0。

Storm的源代码主要基于Clojure以及Java来完成,下面简要介绍一下主要的名字空间。1.3.1 Clojure代码

这部分代码为Storm基础架构的实现,其中Nimbus、Supervisor、Worker、Executor以及Task这些基础组件的实现位于src\clj\backtype.storm下,如表1-1所示。表1-1 Clojure代码1.3.2 Java代码

这部分代码含有Storm基础的流处理以及事务Topology的实现,位于src\jvm\backtype.storm下,如表1-2所示。表1-2 Java代码1.3.3 Trident代码

Trident是Storm对实时消息处理的更高层抽象,是Storm的发展方向之一,详情可参见https://github.com/nathanmarz/storm/wiki/Trident-tutorial。Trident代码位于src\jvm\storm.trident下,具体如表1-3所示。表1-3 Trident代码1.3.4 其他代码

除了以上介绍的三部分之外,Storm还定义了一些基础工具类以及扩展类,主要包括以下几项。

●storm.thrift以及genthrift.sh:Storm基础数据结构和服务的Thrift定义文件以及产生脚本。

●src\ui:定义了Storm UI的资源文件。

●src\multilang:Storm的多语言支持示例。

●src\clj\zilch\mq.clj:ZMQ的使用包装。

访问下面的链接以获得代码库结构的相关信息:

https://github.com/nathanmarz/storm/wiki/Structure-of-the-codebase。第2章搭建Storm集群

在开始研究Storm之前,我们先来搭建单机的和多机的Storm运行环境,然后提交一个示例Topology到搭建好的集群上使其运行,最后再分析一下这个示例Topology的组成。通过学习这一章并进行实践,读者不仅可以对Storm有一个初步的认识,而且能进一步了解Storm的运行原理。2.1搭建单机Storm集群

为了更好地理解Storm的运行原理,我们可以在单机搭建一套Storm运行环境来模拟真实的Storm集群,这有助于我们进一步了解Storm的运行机制,同时还可以基于它进行本地调试。

下面我们会一步一步介绍如何搭建一个本地的Storm运行环境(操作系统版本是Ubuntu 12.04)。

1.下载所需的资源

搭建Storm本地运行环境时,需要下载的资源如下所示。

●Storm:http://storm-project.net/downloads.html,版本0.9.0rc2。

●ZooKeeper:http://www.apache.org/dyn/closer.cgi/zookeeper/,版本3.4.5。

●ZMQ:http://download.zeromq.org/,版本2.1.7。

●jzmq:https://github.com/nathanmarz/jzmq/archive/master.zip。

我们将下载完的所有文件保存在~/project/Storm/文件夹下,并分别解压,相关代码如下。tar -xvf zookeeper-3.4.5.tar.gztar -xvf zookeeper-3.4.5.tar.gzunzip jzmq-master.zipunzip storm-0.9.0-rc2.zip

2.安装JDK

这里我们使用openjdk,安装命令是:sudo apt-get install openjdk-7-jdk

安装完成后,需要为其设置环境变量。修改文件~/.profile,添加以下内容:export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-i386export CLASSPATH=$JAVA_HOME/libexport PATH=$JAVA_HOME/bin:$PATH

保存修改后,运行如下命令使其立即生效:source ~/.profile

3.安装依赖的库文件

Storm及其组件需要依赖很多库文件才能正常工作。依次运行以下命令,安装所有的库:sudo apt-get install libtoolsudo apt-get install autoconfsudo apt-get install automakesudo apt-get install g++sudo apt-get install uuid-devsudo apt-get install uuidsudo apt-get install e2fsprogssudo apt-get install python

4.安装ZMQ

进入解压后的zeromq-2.1.7文件夹,依次运行以下命令:./configuremakesudo make installsudo ldconfig

5.安装jzmq

由于ZMQ是C/C++的库文件,Storm是基于JVM的,没办法直接使。jzmq是用JNI封装的ZMQ的Java库,Storm需要通过它来使用ZMQ。

进入解压后的jzmq-master文件夹中,依次运行以下命令:./autogen.sh./configuremakesudo make install

如果运行make命令的过程中发生以下错误:*** No rule to make target classdist_noinst.stamp', needed by org/zeromq/ZMQ.class'.? Stop.

则执行以下操作。

首先,执行以下命令:touch src/classdist_noinst.stamp

接着进入src/org/zeromq文件夹中执行javac *.java这条命令,最后回退到jzmq-master文件夹的根目录下,依次执行:makesudo make install

6.启动ZooKeeper

进入解压后的zookeeper-3.4.5文件夹中,将文件./conf/zoo_sample.cfg重命名为./conf/zoo.cfg。运行以下命令启动ZooKeeper:bin/zkServer.sh start

然后检查ZooKeeper是否成功启动,此时先执行如下命令:bin/zkCli.sh –server 127.0.0.1:2181

此时会出现一个交互窗口,在其中运行 ls /。

7.启动Storm

进入解压后的storm-0.9.0-rc2文件夹的bin目录中,依次执行以下命令启动Nimbus、Supervisor以及UI:./storm nimbus./storm supervisor./storm ui

等待命令都执行完成后,打开链接http://localhost:8080,此时应该能看到Storm UI界面。

8.编译storm-starter jar包

接下来,我们在运行起来的集群上提交一个Topology,这里我们使用storm-starter做示范。

如果没有安装过git工具,可以运行下面的命令安装:sudo apt-get install git

如果没有安装过leiningen工具,则按照https://github.com/technomancy/leiningen的步骤安装。我们把下载下来的storm-starter源代码保存在~/project/storm-starter中。首先,进入该目录中:cd ~/project/storm-starter

从GitHub上克隆一份storm-starter的源代码:git clone git://github.com/nathanmarz/storm-starter.git

依次执行以下命令创建项目jar包:lein depslein compilelein install

创建好的jar包storm-starter-0.0.1-SNAPSHOT.jar位于target目录下。

9.提交Topology

进入storm-0.9.0-rc2文件夹中的bin目录中,运行以下命令提交Topology:./storm jar ~/project/storm-stater/target/storm-starter-0.0.1-SNAPSHOT.jar storm.starter.WordCount  Topology wordcount

等待提交结束后,刷新页面http://localhost:8080,我们可以看到提交的“wordcount”Topology,点击wordcount可以看到其详细运行情况。

至此,我们已经成功地在本地搭建好Storm集群,并且成功地运行了我们的第一个Topology。2.2搭建多机Storm集群

我们可以用多台机器搭建一个多机运行环境,这也是Storm的实际应用场景,下面我们来看一下怎么搭建这样的集群。

假设我们有3台机器M1、M2以及M3,它们的IP地址分别为10.1.172.1、10.1.172.2以及10.1.172.3,具体的分配情况如下:

●M1作为Nimbus

●M2作为ZooKeeper

●M3作为Supervisor2.2.1 设置环境

首先,我们需要按照搭建单机集群的方式在每台机器上都搭建好Storm环境,也即执行2.1节的前5步。

接下来,修改M1和M3上的storm.yaml文件,它的路径是~/project/Storm/storm-0.9.0-rc2/conf/。修改storm.yaml的内容为:java.library.path: "/usr/local/lib:/usr/lib:/opt/local/lib"storm.zookeeper.servers:  - "10.1.172.2"nimbus.host: "10.1.172.1"ui.port: 83supervisor.slots.ports:  - 6700  - 6701  - 6702  - 6703

下面解释一下这些配置项。

●java.library.path:该配置项配置启动Storm所需lib包的路径。

●storm.zookeeper.servers:该配置项配置了当前集群中所有ZooKeeper机器的IP地址。我们只有一个ZooKeeper服务器,所以只配置了一个IP。

●nimbus.host:该配置项指明了Nimbus机器的IP地址。

●ui.port:该配置项配置了Storm UI使用的端口。如果不配置该项,默认使用8080端口,这里设置为使用83端口。

●supervisor.slots.ports:该配置项指明了一台Supervisor机器上所有可以使用的slot信息,也即端口号。表明该机器上最多可以启动4个Worker。

Storm还提供了很多其他配置项,我们会在最后一章中对这些常见配置项进行详细的介绍。2.2.2 启动Storm集群

首先,在M2上启动ZooKeeper服务,并验证它是否成功启动。

接下来,在M1上进入storm-0.9.0-rc2文件夹的bin目录,依次执行以下命令启动Nimbus和UI:./storm nimbus./storm ui

最后在M3上进入storm-0.9.0-rc2文件夹的bin目录中,执行以下命令启动Supervisor:./storm supervisor

等待都启动完成后,访问http://10.1.172.1:83就能看到启动起来的集群。2.2.3 提交Topology

在M1(也即Nimbus所在的机器)上将WordCountTopology提交到集群中,然后刷新一下http://10.1.172.1:83页面,就能看到提交的Topology了。

至此,我们就完成了部署一个简单多机Storm 集群并提交Topology到集群运行的所有步骤。接下来,我们借助WordCountTopology示例,介绍一下Topology的组成。2.3WordCountTopology介绍

WordCountTopology是一个基本的Storm Topology,由三个组件构成:

●RandomSentenceSpout

●SplitSentence

●WordCount

下面分别介绍这几个组件。2.3.1 RandomSentenceSpout

这个类定义了一个Spout,它继承自BaseRichSpout。BaseRichSpout是一个实现了IRichBolt接口的虚类,这个接口是Storm中的一个主要接口。它的nextTuple方法随机地从一个句子数组中选出一个句子发送出去,declareOutputFields方法声明了该Spout输出的消息模式,这里输出只有一列,字段名是word:public class RandomSentenceSpout extends BaseRichSpout {  SpoutOutputCollector _collector;  Random _rand;  @Override  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {    _collector = collector;    _rand = new Random();  }  @Override  public void nextTuple() {    Utils.sleep(100);    String[] sentences = new String[] {      "the cow jumped over the moon",      "an apple a day keeps the doctor away",      "four score and seven years ago",      "snow white and the seven dwarfs",      "i am at two with nature"};    String sentence = sentences[_rand.nextInt(sentences.length)];    _collector.emit(new Values(sentence));  }  @Override  public void ack(Object id) {  }  @Override  public void fail(Object id) {  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {    declarer.declare(new Fields("word"));  }}2.3.2 SplitSentence

该类定义了一个Bolt,它继承自BaseBasicBolt。BaseBasicBolt是一个实现了IBasicBolt接口的虚类。execute方法是Bolt真正处理业务逻辑的地方,它将从Spout收到的句子按照空格分割,然后把每一个单词作为一条信息发送出去。declareOutputFields方法声明该Bolt的输出消息格式,这里也只有一列,字段名是word。SplitSentence类的定义如下:public static class SplitSentence extends BaseBasicBolt {  public void execute(Tuple tuple, BasicOutputCollector collector) {    String sentence = tuple.getString(0);    for (String word : sentence.split(" "))      collector.emit(new Values(word));  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {    declarer.declare(new Fields("word"));  }}2.3.3 WordCount

类WordCount跟SplitSentence类似,也定义了一个Bolt。这个类对收到的所有单词进行计数统计,execute方法更新收到单词的缓存数,并将当前该单词及其对应的数目发送出去;declareOutputFields方法声明该Bolt的输出消息格式,这里输出有两列,字段名分别是word和count;cleanup方法在该Topology被停掉的时候被调用(不保证一定能够调用到),它将当前缓存的所有单词及数目信息打印到日志中。类WordCount的定义如下:public static class WordCount extends BaseBasicBolt {  Map counts = new HashMap();  @Override  public void execute(Tuple tuple, BasicOutputCollector collector) {    String word = tuple.getString(0);    Integer count = counts.get(word);    if(count==null) count = 0;    count++;    counts.put(word, count);    collector.emit(new Values(word, count));  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {    declarer.declare(new Fields("word", "count"));  }  @Override  public void cleanup(){    for(Map.Entry entry : counts.entrySet()){      logger.info(entry.getKey() + ": " + entry.getValue());    }  }}2.3.4 WordCountTopology构建

类WordCountTopology是真正定义Topology的地方,其代码如下所示:1 public class WordCountTopology {2  public static void main(String[] args) throws Exception {3    TopologyBuilder builder = new TopologyBuilder();4    builder.setSpout("spout", new RandomSentenceSpout(), 5);5    builder.setBolt("split", new SplitSentence(), 8)6       .shuffleGrouping("spout");7     builder.setBolt("count", new WordCount(), 12)8       .fieldsGrouping("split", new Fields("word"));910    Config conf = new Config();11    conf.setDebug(true);1213    if(args!=null && args.length > 0) {14      conf.setNumWorkers(3);15      StormSubmitter.submitTopology("WordCountTopology", conf, builder.createTopology());16    } else {17      conf.setMaxTaskParallelism(3);1819      LocalCluster cluster = new LocalCluster();20      cluster.submitTopology("WordCountTopology", conf, builder.createTopology());21      Thread.sleep(10000);22      cluster.shutdown();23    }24  }25 }

●第3行创建一个TopologyBuilder对象,这个类是用来构建基本Topology的,后面我们会详细介绍它。

●第4行设置Topology的Spout,它的id是spout,这里创建一个RandomSentenceSpout对象作为Spout对象,并行度设置为5。

●第5~6行设置Topology的Bolt,它的id是split,这里创建一个SplitSentence对象作为Bolt对象,并行度设置为8。它接收spout发出的消息,其分组策略是随机分组(Shuffle Grouping),即spout的多个实例会随机分发消息到split的各个实例上。

●第7~8行设置Topology的另一个Bolt,它的id是count,这里创建一个WordCount对象作为Bolt对象,并行度设置为12。它接收split发出的消息,其分组策略是域分组(Fields Grouping),即split的各个实例会按照消息word列所对应的值决定将消息发送到count的哪个实例中。所有word列值相同的消息会被发到同一个count节点中处理。

●第10~11行设置该Topology所用的配置信息,这里仅设置了调试模式为真。这样系统会打印所有发送及接收的消息。

●第13~15行是在集群上提交该Topology,我们前面介绍的单机跟多机环境都是用这种方式运行的。

●第17~22行是直接运行该Topology,但它不会提交到真实的集群上。Storm提供了一个LocalCluster对象来模拟集群运行环境,它采用线程模拟进程的方式实现,一般用于调试写好的Topology。第3章Storm编程基础

本章将介绍一些Storm中的基础概念,以及相关类和接口的用法。另外,在网站GitHub的wiki页面上(https://github.com/nathanmarz/storm/wiki/Concepts),还专门提供了有关Storm核心概念的简要描述,读者可以将其与本章的讲解对照起来进行学习。3.1Fields定义

Fields数据结构用于存储消息的字段名列表,其所需参数是字段名集合。对于同一条消息,在构建Fields对象时会为其所有的字段建立索引。它的定义如下:1 public class Fields implements Iterable, Serializable {2  private List _fields;3  private Map _index = new HashMap();45  public Fields(String…… fields) {6    this(Arrays.asList(fields));7  }89  public Fields(List fields) {10    _fields = new ArrayList(fields.size());11    for (String field : fields) {12      if (_fields.contains(field))13        throw new IllegalArgumentException(14          String.format("duplicate field '%s'", field)15        );16      _fields.add(field);17    }18    index();19  }20  ……21 }

●第1行表明Fields类实现了接口Iterable和Serializable。接口Iterable定义了一个迭代器接口,用于遍历Fields中存储的字段名列表;接口Serializable则表明该类是可以被序列化的。

●第2~3行分别定义了一个保存所有字段名的列表,以及一个保存了从字段名到它在字段名列表中位置的映射表。

●第5~7行的构造函数接收一个可变参数fields(也即一个字段名数组),将fields转换为列表后调用第9~19行定义的构造函数。

●第9~19行定义的构造函数会首先检查传入的字段名列表中的字段名是否有重复,并保存该字段名列表,最后调用index方法为该字段名列表建立索引。

index方法实际上就是遍历字段名列表,将每个字段名和它对应的位置保存到第3行定义的映射表中,其代码如下:1 private void index() {2  for(int i=0; i<_fields.size(); i++) {3    _index.put(_fields.get(i), i);4  }5 }

此外,Fields数据结构中还定义了很多常用方法,比如获取所有字段名、获取字段名列表的大小、获取指定位置的字段名、获取某个字段名的索引位置以及获取一个所有字段名的迭代器等。这些方法都相对简单易懂,限于篇幅,此处不再赘述。3.2Tuple接口

Tuple是Storm中的主要数据结构。在Storm发送接收消息的过程中,每一条消息实际上都是一个Tuple对象。下面首先来看一下Tuple接口的定义:1 public interface Tuple {2  public int size();3  public int fieldIndex(String field);4  public boolean contains(String field);56  public Object getValue(int i);7  public String getString(int i);8  public Integer getInteger(int i);9  public Long getLong(int i);10  public Boolean getBoolean(int i);11  public Short getShort(int i);12  public Byte getByte(int i);13  public Double getDouble(int i);14  public Float getFloat(int i);15  public byte[] getBinary(int i);1617  public Object getValueByField(String field);18  public String getStringByField(String field);19  public Integer getIntegerByField(String field);20  public Long getLongByField(String field);21  public Boolean getBooleanByField(String field);22  public Short getShortByField(String field);23  public Byte getByteByField(String field);24  public Double getDoubleByField(String field);25  public Float getFloatByField(String field);26  public byte[] getBinaryByField(String field);2728  public List getValues();29  public Fields getFields();30  public List select(Fields selector);3132  public GlobalStreamId getSourceGlobalStreamid();33  public String getSourceComponent();34  public int getSourceTask();35  public String getSourceStreamId();36  public MessageId getMessageId();37 }

●第2行的size方法返回当前消息中字段的数目。

●第3行的fieldIndex方法可根据传入的字段名获取该字段在所有字段中所处的位置。

●第4行的contains方法用来判断该消息是否包含指定的字段。

●第6~15行的方法用于获取由参数i指定的字段位置的值。如果用户知道该字段对应的类型,就可调用对应类型的获取方法获取字段的值。若字段的类型跟获取方法不匹配,将发生异常。

●第17~26行跟第6~15行类似,只不过这里的方法是根据字段名获取对应的值。定义这些重载方法的目的都是为了提高消息处理的性能。

●第28行的getValues方法用来获取该消息存储的值列表。

●第29行的getFields方法用来获取存储了所有字段名的Fields对象。

●第30行的select方法用来获取由参数Fields指定的与字段名对应的值列表。

●第32行用来获取与该消息对应的GlobalStreamId,后面我们会介绍这个类。

●第33行用来获取创建这个消息的组件id。

●第34行用来获取创建这个消息的TaskId,第11章将专门介绍Task。

●第35行用来获取该消息被发送到的流的序号。

●第36行用来获取该Tuple的消息序号,该序号会被Storm用来追踪消息是否处理成功,详细内容请参考第12章。

Storm提供了Tuple的默认实现类TupleImpl。它除了实现Tuple接口之外,还实现了Clojure定义的几个接口Seqable、Indexed和IMeta,实现这些接口的目的是为了在Clojure代码中能更好地操纵Tuple对象。TupleImpl的实现比较容易理解,用户可以自行查看代码,这里不再赘述。3.3常用声明接口

Storm中有多个与组件声明相关的类,它们的主要作用是让用户更加方便地定义组件的输入输出,以及一些与组件相关的配置,其类关系如图3-1所示。图3-1 常用声明接口3.3.1 配置声明接口

ComponentConfigurationDeclarer接口定义了一些和组件相关的配置项。该接口中定义的方法返回值为通用类型T,且T实现了ComponentConfigurationDeclarer。由于该接口中的这些方法返回的是相同的对象,因此在用法上可以实现方法的级联。该接口的定义如下。public interface ComponentConfigurationDeclarer {  T addConfigurations(Map conf);  T addConfiguration(String config, Object value);  T setDebug(boolean debug);  T setMaxTaskParallelism(Number val);  T setMaxSpoutPending(Number val);  T setNumTasks(Number val);}

Storm默认提供了一个抽象类BaseConfigurationDeclarer,它实现了以上接口中除addConfi gurations(Map conf)以外的大部分方法:public abstract class BaseConfigurationDeclarer implements ComponentConfigurationDeclarer {  @Override  public T addConfiguration(String config, Object value) {

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载