RocketMQ实战与原理解析(txt+pdf+epub+mobi电子书下载)


发布时间:2020-06-19 14:26:45

点击下载

作者:杨开元

出版社:机械工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

RocketMQ实战与原理解析

RocketMQ实战与原理解析试读:

前言

为什么要写这本书

几年前在做一个项目的时候,若需要用到消息队列,简单调研一下就会决定用Kafka,因为当时还不知道有RocketMQ。在我加入阿里后,当时有个项目需要用到消息中间件,试用了RocketMQ,发现阿里开源的消息中间件性能非常强大,但是上手有点费劲,因为现有文档多是零零散散的博文。在没有合适文档指导的情况下,对系统中用到的RocketMQ模块心里没底,系统偶尔出现异常时总会束手无策,需要通过查看很多源码,才能保证系统的稳定运行。

熟悉RocketMQ以后,我发现它是一款非常优秀的中间件产品,可以确保不丢消息,而且效率很高。同时因为它是用Java开发的,所以修改起来比较容易。

在阿里内部,RocketMQ很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过RocketMQ流转(在2017年的双11当天,整个阿里巴巴集团通过RocketMQ流转的线上消息达到了万亿级,峰值TPS达到5600万),在阿里大中台策略上发挥着举足轻重的作用。所以如果有合适的参考文档,RocketMQ会被更多人接受和使用,让更多人不必重复造“轮子”。

我做了很多年开发,在学校课本上学的开发知识有限,大多数是通过看书和上网学到的,其中很多优秀的文章对自己帮助很大。所以我很希望能用这本书回馈技术社区中有需要的开发者们。

动笔写这本书前,我系统地阅读了RocketMQ的源码,有些理解不够透彻的地方请教了阿里RocketMQ开发团队的同事,然后也总结了自己多年实际工作中的一些经验。希望这本书能简明扼要地说清楚RocketMQ的使用方法和核心原理。读者对象

·希望学习分布式系统或分布式消息队列的开发人员。

·服务端系统开发者,他们可以借助高质量中间件来提高开发效率。

·软件架构师,他们可以通过消息队列优化复杂系统的设计。本书特色

本书系统地介绍了RocketMQ这款优秀的分布式消息队列软件,通过阅读本书,读者可以快速把RocketMQ应用到自己的项目中,也可以通过更改源码定制符合自身业务的消息中间件。如何阅读本书

本书分为两大部分:

第一部分是RocketMQ实战,包括第1~8章。这是本书的主体内容,可帮助读者快速用好RocketMQ这个分布式消息队列。

这部分是按照由浅入深的方式撰写的,为了让读者快速上手,首先介绍了搭建一个简单RocketMQ集群的方法,以此来发送和接收消息;然后详细介绍了如何用好Consumer和Producer,如何选择合适的类以及进行参数设置;再进一步根据应用,说明如何让RocketMQ在各种异常情况下保持稳定可靠,以及如何增大RocketMQ的吞吐量,从而在单位时间内处理更多的消息。

第二部分是源码分析,包括第9~13章。当读者有特殊的业务需求,需要更改或扩展RocketMQ现有功能的时候,这部分内容能帮助读者快速熟悉源码,找到要下手更改的地方,快速实现想要的功能。

这部分也适合想通过源码,深入学习消息队列的读者阅读。学习别人优秀的代码是提升自己技术水平的一条有效途径。勘误和支持

由于水平有限,编写时间仓促,书中难免会出现一些错误或者不准确的地方,恳请读者批评指正。有任何的意见或建议,都可以通过邮箱rocketmqqa@163.com和我联系,真挚期待你的反馈。致谢

写技术书籍很耗费时间,加之互联网行业快节奏的工作方式,导致我写这本书的时间大多是在周末和夜晚。在此感谢家人对我的支持和理解,尤其感谢我的妻子,没有她对家庭的照顾和对我的鼓励,这本书是无法完成的。

感谢阿里消息中间件团队的Leader王小瑞,是你从技术和写作思路上给我很大的帮助。感谢消息中间件团队的其他同学,你们为开源社区贡献了一个高质量的软件,你们写的很多高质量博文使开发者更容易理解RocketMQ。

感谢机械工业出版社的编辑杨福川、张锡鹏,感谢云栖社区的刁云怡,阿里的校友耿嘉安,是你们始终支持我的写作,你们的引导和帮助使我能顺利完成全部书稿。

谨以本书献给我最亲爱的家人,以及众多热爱软件开发工作的朋友们!杨开元第1章快速入门

本章可以让读者了解RocketMQ和分布式消息队列的功能,然后搭建好单机版的消息队列,进而能够发送并接收简单的消息。1.1 消息队列功能介绍

简单来说,消息队列就是基础数据结构课程里“先进先出”的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠性,并且还能应对大流量的冲击,对消息队列的要求就很高了。现在互联网“微架构”模式兴起,原有大型集中式的IT服务因为各种弊端,通常被分拆成细粒度的多个“微服务”,这些微服务可以在一个局域网内,也可能跨机房部署。一方面对服务之间松耦合的要求越来越高,另一方面,服务之间的联系却越来越紧密,对通信质量的要求也越来越高。分布式消息队列可以提供应用解耦、流量消峰、消息分发等功能,已经成为大型互联网服务架构里标配的中间件。1.1.1 应用解耦

复杂的应用里会存在多个子系统,比如在电商应用中有订单系统、库存系统、物流系统、支付系统等。这个时候如果各个子系统之间的耦合性太高,整体系统的可用性就会大幅降低。多个低错误率的子系统强耦合在一起,得到的是一个高错误率的整体系统。

以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

如图1-1所示,当转变成基于消息队列的方式后,系统可用性就高多了,比如物流系统因为发生故障,需要几分钟的时间来修复,在这几分钟的时间里,物流系统要处理的内容被缓存在消息队列里,用户的下单操作可以正常完成。当物流系统恢复后,补充处理存储在消息队列里的订单信息即可,终端用户感知不到物流系统发生过几分钟的故障。图1-1 消息队列的解耦功能1.1.2 流量消峰

每年的双十一,淘宝的很多活动都在0点的时候开启,大部分应用系统流量会在瞬间猛增,这个时候如果没有缓冲机制,不可能承受住短时大流量的冲击。通过利用消息队列,把大量的请求暂存起来,分散到相对长的一段时间内处理,能大大提高系统的稳定性和用户体验。

举个例子,如果订单系统每秒最多能处理一万次下单,这个处理能力应对正常时段的下单是绰绰有余的,正常时段我们下单后一秒内就能返回结果。在双十一零点的时候,如果没有消息队列这种缓冲机制,为了保证系统稳定,只能在订单超过一万次后就不允许用户下单了;如果有消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单后十几秒才能收到下单成功的状态,但是也比不能下单的体验要好。

使用消息队列进行流量消峰,很多时候不是因为能力不够,而是出于经济性的考量。比如有的业务系统,流量最高峰也不会超过一万QPS,而平时只有一千左右的QPS。这种情况下我们就可以用个普通性能的服务器(只支持一千左右的QPS就可以),然后加个消息队列作为高峰期的缓冲,无须花大笔资金部署能处理上万QPS的服务器。1.1.3 消息分发

在大数据时代,数据对很多公司来说就像金矿,公司需要依赖对数据的分析,进行用户画像、精准推送、流程优化等各种操作,并且对处理的实时性要求越来越高。数据是不断产生的,各个分析团队、算法团队都要依赖这些数据来进行工作,这个时候有个可持久化的消息队列就非常重要。数据的产生方只需要把各自的数据写入一个消息队列即可,数据使用方根据各自需求订阅感兴趣的数据,不同数据团队所订阅的数据可以重复也可以不重复,互不干扰,也不必和数据产生方关联。

如图1-2所示,各个子系统将日志数据不停地写入消息队列,不同的数据处理系统有各自的Offset,互不影响。甚至某个团队处理完的结果数据也可以写入消息队列,作为数据的产生方,供其他团队使用,避免重复计算。在大数据时代,消息队列已经成为数据处理系统不可或缺的一部分。

除了上面列出的应用解耦、流量消峰、消息分发等功能外,消息队列还有保证最终一致性、方便动态扩容等功能。图1-2 消息队列的消息分发功能1.2 RocketMQ简介

阿里的消息中间件有很长的历史,从2007年的Notify到2010年的Napoli,2011年升级后改为MetaQ,然后到2012年开始做RocketMQ,RocketMQ使用Java语言开发,于2016年开源。第一代的Notify主要使用了推模型,解决了事务消息;第二代的MetaQ主要使用了拉模型,解决了顺序消息和海量堆积的问题。RocketMQ基于长轮询的拉取方式,兼有两者的优点。

每一次产品迭代,都吸取了之前的经验教训,目前RocketMQ已经成为Apache顶级项目。在阿里内部,RocketMQ很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过RocketMQ流转(在2017年的双十一当天,整个阿里巴巴集团通过RocketMQ流转的线上消息达到了万亿级,峰值TPS达到5600万),在阿里大中台策略上发挥着举足轻重的作用。

此外,RocketMQ是使用Java语言开发的,比起Kafka的Scala语言和RabbitMQ的Erlang语言,更容易找到技术人员进行定制开发。1.3 快速上手RocketMQ

本节介绍如何安装配置单机版的RocketMQ,以及简单地收发消息。读者也可以参考RocketMQ官网的说明文档。1.3.1 RocketMQ的下载、安装和配置

RocketMQ的Binary版是一些编译好的jar和辅助的shell脚本,可以直接从官网找到下载链接(http://rocketmq.apache.org/dowloading/releases/),也可以下载源码自己编译。

系统要求:64bit的Linux、Unix或Mac。Java版本大于等于JDK1.8。如果需要从GitHub上下载源码和编译的话,需要安装Maven 3.2.x和Git。

RocketMQ当前的最新版本是4.2.0,下面以Binary版本为例说明如何快速使用:> unzip rocketmq-all-4.2.0-bin-release.zip -d ./rocketmq-all-4.2.0-binls> cd rocketmq-all-4.2.0-bin/

里面含有以下内容:LICENSE NOTICE README.md benchmark/ bin/ conf/ lib/

LICENSE、NOTICE和README.md包括一些版权声明和功能说明信息;benchmark里包括运行benchmark程序的shell脚本;bin文件夹里含有各种使用RocketMQ的shell脚本(Linux平台)和cmd脚本(Windows平台),比如常用的启动NameServer的脚本mqnamesrv,启动Broker的脚本mqbroker,集群管理脚本mqadmin等;conf文件夹里有一些示例配置文件,包括三种方式的broker配置文件、logback日志配置文件等,用户在写配置文件的时候,一般基于这些示例配置文件,加上自己特殊的需求即可;lib文件夹里包括RocketMQ各个模块编译成的jar包,以及RocketMQ依赖的一些jar包,比如Netty、commons-lang、FastJSON等。1.3.2 启动消息队列服务

启动单机的消息队列服务比较简单,不需要写配置文件,只需要依次启动本机的NameServer和Broker即可。

启动NameServer:> nohup sh bin/mqnamesrv &> tail -f ~/Logs/rocketmqLogs/namesrv.LogThe Name Server boot success...

启动Broker:> nohup sh bin/mqbroker –n localhost:9876&> tail -f ~/Logs/rocketmqLogs/broker.LogThe broker[%s, 192.168.0.233:10911] boot success...1.3.3 用命令行发送和接收消息

为了快速展示发送和接收消息,本节展示的是用命令行发送和接收消息,实际上就是运行写好的demo程序,后续我们可以参考这些demo来写自己的发送和接收程序。

运行示例程序,发送和接收消息: > export NAMESRV_ADDR=localhost:9876 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.ProducerSendResult [sendStatus=SEND_OK, msgId= ...> sh bin/tools.sh org.apache.rocketmq.example.quickstart.ConsumerConsumeMessageThread_%d Receive New Messages: [MessageExt...1.3.4 关闭消息队列

消息队列被启动后,如果不主动关闭,则会一直在后台运行,占用系统资源。我们有专门用来关闭NameServer和Broker的命令。

关闭NameServer和Broker:> sh bin/mqshutdown brokerThe mqbroker(36695) is running...Send shutdown request to mqbroker(36695) OK> sh bin/mqshutdown namesrvThe mqnamesrv(36664) is running...Send shutdown request to mqnamesrv(36664) OK

恭喜,现在你已经能够使用RocketMQ发送并接收消息了,使用消息队列的基本功能就是这么简单。1.4 本章小结

本章介绍了消息队列的功能,以及RocketMQ这个消息队列从阿里诞生的历史。然后基于快速上手的目的,本章直接给出了一些命令示例,读者跟着操作即可快速启动一个RocketMQ服务,并且可以尝试发送和接收简单的消息。有了本章的初步体验后,下一章将介绍如何在生产环境使用RocketMQ。第2章生产环境下的配置和使用

本章的目的是带领读者快速将RocketMQ应用到生产环境中,因此不会探究原理和细节。本章会先介绍RocketMQ的各个角色,然后介绍如何搭建一个高可用的分布式消息队列集群,以及RocketMQ的Consumer和Producer的使用方法与常用命令。2.1 RocketMQ各部分角色介绍

RocketMQ由四部分组成,先来直观地了解一下这些角色以及各自的功能。分布式消息队列是用来高效地传输消息的,它的功能和现实生活中的邮局收发信件很类似,我们类比地说一下相应的模块。现实生活中的邮政系统要正常运行,离不开下面这四个角色,一是发信者,二是收信者,三是负责暂存、传输的邮局,四是负责协调各个地方邮局的管理机构。对应到RocketMQ中,这四个角色就是Producer、Consumer、Broker和NameServer。

启动RocketMQ的顺序是先启动NameServer,再启动Broker,这时候消息队列已经可以提供服务了,想发送消息就使用Producer来发送,想接收消息就使用Consumer来接收。很多应用程序既要发送,又要接收,可以启动多个Producer和Consumer来发送多种消息,同时接收多种消息。

为了消除单点故障,增加可靠性或增大吞吐量,可以在多台机器上部署多个NameServer和Broker,为每个Broker部署一个或多个Slave。图2-1 RocketMQ各个角色间关系

了解了四种角色以后,再介绍一下Topic和Message Queue这两个名词。一个分布式消息队列中间件部署好以后,可以给很多个业务提供服务,同一个业务也有不同类型的消息要投递,这些不同类型的消息以不同的Topic名称来区分。所以发送和接收消息前,先创建Topic,针对某个Topic发送和接收消息。有了Topic以后,还需要解决性能问题。如果一个Topic要发送和接收的数据量非常大,需要能支持增加并行处理的机器来提高处理速度,这时候一个Topic可以根据需求设置一个或多个Message Queue,Message Queue类似分区或Partition。Topic有了多个Message Queue后,消息可以并行地向各个Message Queue发送,消费者也可以并行地从多个Message Queue读取消息并消费。2.2 多机集群配置和部署

本节将说明如何只用两台物理机,搭建出双主、双从、无单点故障的高可用RocketMQ集群。假设这两台物理机的IP分别是192.168.100.131和192.168.100.132。2.2.1 启动多个NameServer和Broker

首先在这两台机器上分别启动NameServer(nohup sh bin/mqnamesrv &),这样我们就得到了一个无单点的NameServer服务,服务地址是“192.168.100.131:9876;192.168.100.132:9876”。

然后启动Broker,每台机器上都要分别启动一个Master角色的Broker和一个Slave角色的Broker,并互为主备。可以基于RocketMQ自带的示例配置文件写自己的配置文件(示例配置文件在conf/2m-2s-sync目录下)。

1)192.168.100.131机器上Master Broker的配置文件: namesrvAddr=192.168.100.131:9876; 192.168.100.132:9876 brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10911 storePathRootDir=/home/rocketmq/store-a

2)192.168.100.132机器上Master Broker的配置文件: namesrvAddr=192.168.100.131:9876; 192.168.100.132:9876 brokerClusterName=DefaultCluster brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10911 storePathRootDir=/home/rocketmq/store-b

3)192.168.100.131机器上Slave Broker的配置文件: namesrvAddr=192.168.100.131:9876; 192.168.100.132:9876 brokerClusterName=DefaultCluster brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=11011 storePathRootDir=/home/rocketmq/store-b

4)192.168.100.132机器上Slave Broker的配置文件: namesrvAddr=192.168.100.131:9876; 192.168.100.132:9876 brokerClusterName=DefaultCluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=11011 storePathRootDir=/home/rocketmq/store-a

然后分别使用如下命令启动四个Broker: nohup sh ./bin/mqbroker -c config_file &

这样一个高可用的RocketMQ集群就搭建好了,还可以在一台机器上启动rocketmq-console,比如在192.168.100.131上启动RocketMQ-console,然后在浏览器中输入地址192.168.100.131:8080,这样就可以可视化地查看集群状态了。2.2.2 配置参数介绍

本节将逐个介绍Broker配置文件中用到的参数含义:

1)namesrvAddr=192.168.100.131:9876;192.168.100.132:9876

NamerServer的地址,可以是多个。

2)brokerClusterName=DefaultCluster

Cluster的地址,如果集群机器数比较多,可以分成多个Cluster,每个Cluster供一个业务群使用。

3)brokerName=broker-a

Broker的名称,Master和Slave通过使用相同的Broker名称来表明相互关系,以说明某个Slave是哪个Master的Slave。

4)brokerId=0

一个Master Borker可以有多个Slave,0表示Master,大于0表示不同Slave的ID。

5)fileReservedTime=48

在磁盘上保存消息的时长,单位是小时,自动删除超时的消息。

6)deleteWhen=04

与fileReservedTime参数呼应,表明在几点做消息删除动作,默认值04表示凌晨4点。

7)brokerRole=SYNC_MASTER

brokerRole有3种:SYNC_MASTER、ASYNC_MASTER、SLAVE。关键词SYNC和ASYNC表示Master和Slave之间同步消息的机制,SYNC的意思是当Slave和Master消息同步完成后,再返回发送成功的状态。

8)flushDiskType=ASYNC_FLUSH

flushDiskType表示刷盘策略,分为SYNC_FLUSH和ASYNC_FLUSH两种,分别代表同步刷盘和异步刷盘。同步刷盘情况下,消息真正写入磁盘后再返回成功状态;异步刷盘情况下,消息写入page_cache后就返回成功状态。

9)listenPort=10911

Broker监听的端口号,如果一台机器上启动了多个Broker,则要设置不同的端口号,避免冲突。

10)storePathRootDir=/home/rocketmq/store-a

存储消息以及一些配置信息的根目录。

这些配置参数,在Broker启动的时候生效,如果启动后有更改,要重启Broker。现在使用云服务或多网卡的机器比较普遍,Broker自动探测获得的ip地址可能不符合要求,通过brokerIP1=47.98.41.234这样的配置参数,可以设置Broker机器对外暴露的ip地址。2.3 发送/接收消息示例

可以用自己熟悉的开发工具创建一个Java项目,加入RocketMQ Client包的依赖,用代码清单2-1的内容发送消息,这个示例代码是以Sync方式发送消息的。

代码清单2-1 Producer示例程序public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a Producer group name. DefaultMQProducer Producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.100.131:9876"); //Launch the instance. Producer.start(); for (int i = 0; i < 100; i++) { //Create a Message instance, specifying Topic, tag and Message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send Message to deliver Message to one of brokers. SendResult sendResult = Producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the Producer instance is not longer in use. Producer.shutdown(); }}

主要流程是:创建一个DefaultMQProducer对象,设置好GroupName和NameServer地址后启动,然后把待发送的消息拼装成Message对象,使用Producer来发送。接下来看看如何接收消息,也就是使用DefaultMQPush-Consumer类实现的消费者程序,如代码清单2-2所示。

代码清单2-2 Consumer示例程序 /* * Instantiate with specified Consumer group name. */ DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("please rename to unique group name"); /* * Specify name server addresses. Consumer.setNamesrvAddr("192.168.249.47:9876"); /* * Specify where to start in case the specified Consumer group is a brand new one. */Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //Consumer.setMessageModel(MessageModel.BROADCASTING); /* * Subscribe one more more Topics to consume. */ Consumer.subscribe("TopicTest”, "*"); /* * Register callback to execute on arrival of Messages fetched from brokers. */ Consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the Consumer instance. */ Consumer.start();

Consumer或Producer都必须设置GroupName、NameServer地址以及端口号。然后指明要操作的Topic名称,最后进入发送和接收逻辑。2.4 常用管理命令

MQAdmin是RocketMQ自带的命令行管理工具,在bin目录下,运行mqadmin即可执行。使用mqadmin命令,可以进行创建、修改Topic,更新Broker的配置信息,查询特定消息等各种操作。本节将介绍几个常用的命令。

1.创建/修改Topic

消息的发送和接收都要有对应的Topic,需要向某个Topic发送或接收消息,所以在正式使用RocketMQ进行消息发送和接收前,要先创建Topic,创建Topic的指令是updateTopic,表2-1列出了支持的参数。表2-1 updateTopic

2.删除Topic

与创建/修改Topic对应的是删除Topic,把RocketMQ系统中不用的Topic彻底清除,指令是deleteTopic,表2-2列出了支持的参数。表2-2 deleteTopic

3.创建/修改订阅组

订阅组在提高系统的高可用性和吞吐量方面扮演着重要的角色,比如用Clustering模式消费一个Topic里的消息内容时,可以启动多个消费者并行消费,每个消费者只消费Topic里消息的一部分,以此提高消费速度,这个时候就是通过订阅组来指明哪些消费者是同一组,同一组的消费者共同消费同一个Topic里的内容。订阅组可以被自动创建,使用这个命令一般是用来修改订阅组,指令是updateSubGroup,表2-3列出了支持的参数。表2-3 updateSubGroup

4.删除订阅组

与创建或修改订阅组相对应,这个命令删除不再使用的订阅组,指令是deleteSubGroup,表2-4列出了支持的参数。表2-4 deleteSubGroup

5.更新Broker配置

Broker有很多的配置信息,在Broker启动时,可以通过配置文件来指定配置信息。有些配置信息支持在Broker运行的时候动态更改,更改指令是updateBrokerConfig,表2-5列出了支持的参数。表2-5 updateBrokerConfig

6.更新Topic的读写权限

RocketMQ支持对Topic进行权限控制,主要分为只读的Topic和可读写的Topic,权限可以通过指令updateTopicPerm来动态改变,表2-6列出了支持的参数。表2-6 updateTopicPerm

7.查询Topic的路由信息

Topic的路由信息指的是某个Topic所在的Broker相关信息,客户端可以通过NameServer来获取这些信息,本命令一般在调试的时候使用,指令是TopicRoute,表2-7列出了支持的参数。表2-7 TopicRoute

8.查看Topic列表信息

上面提到的TopicRoute是列出某个Topic的相关信息,还有个指令TopicList用来列出集群中所有Topic的名称,表2-8列出了支持的参数。表2-8 TopicList

9.查看Topic统计信息

在使用RocketMQ的时候,经常需要查看某个Topic的状态,看看消息的数量,有多少未处理等,此时可以通过指令TopicStats来查询,表2-9列出了支持的参数。表2-9 TopicStats

10.根据时间查询消息

一条消息被发送到RocketMQ后,默认会带上发送的时间戳,所以我们可以根据估计的时间来查询消息,指令是printMsg,表2-10列出了支持的参数。表2-10 printMsg

11.根据消息ID查询消息

根据消息ID可以精确定位到某条消息,但是消息ID需要通过其他方式来获取,比如可以先用时间来查询出一些消息,然后定位到要找的具体某个消息,指令是queryMsgById,表2-11列出了支持的参数。表2-11 queryMsgById

12.查看集群消息

指令clusterList用来列出集群的状态,看看有哪些Broker在提供服务,表2-12列出了支持的参数。表2-12 clusterList2.5 通过图形界面管理集群

对于RocketMQ新手,可以启动运维服务,从页面上直观看到消息队列集群的状态。有一定经验以后,可以使用命令行更快捷,其功能更全面。

运维服务程序是个SpringBoot项目,需要从GitHub上的apache/rocketmq-externals里下载源码(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console)。

进入下载源码的目录,运行如下命令即可启动:mvn spring-boot:run

也可以编译成jar包,通过java-jar来执行。

服务启动后,在浏览器里访问server_ip_address:8080(server_ip_address是启动rocketmq-console的机器IP)地址就可看到集群的状态。图2-2 rocketmq-console页面2.6 本章小结

在生产环境中使用RocketMQ集群需要比QuickStart部分了解更多的内容,本章在机器角色、集群配置和部署,以及集群管理方面都做了介绍,用户可以基于这些内容搭建起一个生成环境的RocketMQ消息队列集群,在数据量不大的非关键场景,可以通过这一章快速上线。下一章重点讲如何用好RocketMQ,即根据实际场景选择合适的发送消息和接收消息的方式。第3章用适合的方式发送和接收消息

生产者和消费者是消息队列的两个重要角色,生产者向消息队列写入数据,消费者从消息队列里读取数据,RocketMQ的大部分用户只需要和生产者、消费者打交道。本章具体介绍不同类型生产者和消费者的特点,以及和它们相关的Offset和Log。3.1 不同类型的消费者

根据使用者对读取操作的控制情况,消费者可分为两种类型。一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。3.1.1 DefaultMQPushConsumer的使用

使用DefaultMQPushConsumer主要是设置好各种参数和传入处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset,而且加入新的DefaultMQPushConsumer后会自动做负载均衡。下面结合org.apache.rocketmq.example.quickstart包中的源码来介绍,如代码清单3-1所示。

代码清单3-1 DefaultMQPushConsumer示例public class QuickStart { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer ("please_rename_unique_group_name_4");Consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); Consumer.setMessageModel(MessageModel.BROADCASTING); Consumer.subscribe("TopicTest", "*"); Consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); Consumer.start(); }}

DefaultMQPushConsumer需要设置三个参数:一是这个Consumer的GroupName,二是NameServer的地址和端口号,三是Topic的名称,下面将分别进行详细介绍。

1)Consumer的GroupName用于把多个Consumer组织到一起,提高并发处理能力,GroupName需要和消息模式(MessageModel)配合使用。

RocketMQ支持两种消息模式:Clustering和Broadcasting。

·在Clustering模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。

·在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。

2)NameServer的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如“ip1:port;ip2:port;ip3:port”。

3)Topic名称用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:Consumer.subscribe("TopicTest","tag1||tag2||tag3"),表示这个Consumer要消费“TopicTest”下带有tag1或tag2或tag3的消息(Tag是在发送消息时设置的标签)。在填写Tag参数的位置,用null或者“*”表示要消费这个Topic的所有消息。3.1.2 DefaultMQPushConsumer的处理流程

本节通过分析源码来说明DefaultMQPushConsumer的处理流程。

DefaultMQPushConsumer主要功能实现在DefaultMQPushConsumerImpl类中,消息的处理逻辑是在pullMessage这个函数里的PullCallBack中。在PullCallBack函数里有个switch语句,根据从Broker返回的消息类型做相应的处理,具体处理逻辑可以查看源码,如代码清单3-2所示。

代码清单3-2 DefaultMQPushConsuer的处理逻辑switch (pullResult.getPullStatus()) { case FOUND: …… break; case NO_NEW_MSG: …… break; case OFFSET_ILLEGAL: …… break; default: break;}

DefaultMQPushConsuer的源码中有很多PullRequest语句,比如Default-MQPushConsumerImpl.this.executePullRequestImmediately(pullRequest)。为什么“PushConsumer”中使用“PullRequest”呢?这是通过“长轮询”方式达到Push效果的方法,长轮询方式既有Pull的优点,又兼具Push方式的实时性。

Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端:首先是加大Server端的工作量,进而影响Server的性能;其次,Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。

Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来时,有可能没有被及时处理。“长轮询”方式通过Client端和Server端的配合,达到既拥有Pull的优点,又能达到保证实时性的目的。我们结合源码来分析,如代码清单3-3和3-4所示。

代码清单3-3 发送Pull消息代码片段PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.ConsumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(Offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);------PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);

源码中有这一行设置语句requestHeader.setSuspendTimeoutMillis(brokerSus-pendMaxTimeMillis),作用是设置Broker最长阻塞时间,默认设置是15秒,注意是Broker在没有新消息的时候才阻塞,有消息会立刻返回。

代码清单3-4 “长轮询”服务端代码片段package org.apache.rocketmq.broker.longpolling------if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000);} else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) { Log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}

从Broker的源码中可以看出,服务端接到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认是5秒),然后后再Check。默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过Request里面的Broker-SuspendMaxTimeMillis,就返回空结果。在等待的过程中,Broker收到了新的消息后会直接调用notifyMessageArriving函数返回请求结果。“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。

长轮询方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。3.1.3 DefaultMQPushConsumer的流量控制

本节分析PushConsumer的流量控制方法。PushConsumer的核心还是Pull方式,所以采用这种方式的客户端能够根据自身的处理速度调整获取消息的操作速度。因为采用多线程处理方式实现,流量控制的方面比单线程要复杂得多。

PushConsumer有个线程池,消息处理逻辑在各个线程里同时执行,这个线程池的定义如代码清单3-5所示。

代码清单3-5 DefaultMQPushConsumer的线程池定义this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue,

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载