RocketMQ技术内幕:RocketMQ架构设计与实现原理(txt+pdf+epub+mobi电子书下载)


发布时间:2020-06-25 17:55:13

点击下载

作者:丁威,周继锋

出版社:机械工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

RocketMQ技术内幕:RocketMQ架构设计与实现原理

RocketMQ技术内幕:RocketMQ架构设计与实现原理试读:

前言

为什么要写这本书

随着互联网技术蓬勃发展,微服务架构思想的兴起,系统架构开始追求小型化、轻量化,原有的大型集中式的IT系统通常需要进行垂直拆分,孵化出颗粒度更小的众多小型系统,因此对系统间松耦合的要求越来越高,目前RPC、服务治理、消息中间件几乎成为互联网架构的标配。

引入消息中间件,服务之间可以通过可靠的异步调用,降低系统之间的耦合度,提高系统的可用性。消息中间件的另一个重要应用场景是解决系统之间数据的一致性(最终一致性)。

RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,承载了阿里“双11”大部分业务,可以说是一名久经战场的“精英”、值得信任的“伙伴”。同时它的开发语言为Java,自然而然地得到了广大互联网架构师们的青睐,成为互联网行业首选的消息中间件。

初次接触RocketMQ是在听到阿里巴巴正式将RocketMQ捐献给Apache基金会,成为Apache的顶级开源项目时,这意味着承载阿里“双11”巨大流量的消息中间件完全走向开源,对广大Java开发者来说无疑是一个巨大的利好,让我们有机会一睹高性能消息中间件RocketMQ的“真容”。作为一名阿里技术崇拜者,我内心异常激动,于是不假思索地在CSDN上开通了专栏“源码研究RocketMQ”,受到了广大技术朋友的支持。

RocketMQ作为一款高性能消息中间件,其核心优势是可靠的消息存储、消息发送的高性能与低延迟、强大的消息堆积能力与消息处理能力、严格的顺序消息模式等。RocketMQ的另一个核心思想是懂得取舍。软件设计不可能做到面面俱到,消息中间件的理想状态是一条消息能且只能被消费一次,但要做到这一点,必然需要牺牲性能。RocketMQ的设计者解决这一难题的办法是不去解决,即保证消息至少被消费一次,但不承诺消息不会被消费者多次消费,其消费的幂等由消费者实现,从而极大地简化了其实现内核,提高了RocketMQ的整体性能。

自从RocketMQ被捐献给Apache基金会后便在快速发展,RocketMQ的设计者们正在制定消息中间件的新规范,其模块为openmessaging。本书主要是抛砖引玉,与各位读者朋友们探讨RocketMQ的实现原理,使读者能更好地在实际项目中应用RocketMQ。读者对象

□RocketMQ用户和爱好者

□RocketMQ代码开发志愿者

□Java中高级开发工程师

□Java架构师

□有志于从事Java开源的相关技术从业者本书特色

本书从源码的角度对RocketMQ的实现原理进行详细剖析,从中阐述了作者学习阅读源码的方法。本书作为一本源码阅读类书籍,其讲解切入点并不是以组成RocketMQ的一个个源码包进行展开,而是基于功能模块(如Topic路由中心、消息发送、消息存储、消息消费、事务消息)来展开,更加贴近实战需求。如何阅读本书

本书分为三大部分。

第一部分为准备篇(第1章),简单介绍了RocketMQ的设计理念与目标,并介绍了在开发工具中如何对RocketMQ进行代码调试。

第二部分为实现篇(第2~8章),重点讲解了RocketMQ各个功能模块的实现原理,包括NameServer、消息发送、消息存储、消息消费、消息过滤、顺序消息、事务消息等。

第三部分为实例篇(第9章),通过示例展示RocketMQ的使用技巧,着重讲解了RocketMQ的监控命令与监控管理界面。

本书在最后的附录中给出了RocketMQ的主要参数列表及含义,供读者参考。

本书的行文思路主要是根据消息发送的全流程进行展开,从路由管理到消息发送、消息存储、消息消费,再到顺序消息、事务消息,从而实现消息链路的闭环。建议读者按照该思路,带着问题来阅读本书,或许会事半功倍。勘误和支持

除封面署名外,参加本书编写工作的还有陈鹏飞。由于水平有限,编写时间仓促,书中难免会出现一些错误或者不准确的地方,恳请读者批评指正。为此,大家可以通过CSDN博客专栏(https://blog.csdn.net/column/details/20603.html)留言反馈。书中的全部源文件可以从github rocketmq官方仓库中下载,我也会将相应的功能及时更新。如果你有更多的宝贵意见,也欢迎发送邮件至dw19871218pmz@126.com,期待能够得到你的真挚反馈。致谢

首先要感谢MyCAT开源社区负责人周继锋对我的提携与指导,为我的职业发展指明前进的方向。

感谢RocketMQ联盟中每一位充满创意和活力的朋友——奔腾、zenk、共产国际史派克、水动力皮划艇、张登、张凤凰、曾文、季永超,以及名单之外的很多朋友,感谢你们对我的支持与帮助。感谢杨福川老师的引荐,是你的努力才促成了本书的成功出版。

感谢机械工业出版社华章公司的编辑张锡鹏,在这一年多的时间中始终支持我的写作,你的鼓励和帮助引导我能顺利完成全部书稿。

最后感谢我的爸爸、妈妈、爷爷、奶奶,感谢你们将我培养成人,并时时刻刻为我灌输着信心和力量!感谢我的老婆、女儿,你们是我持续努力的最大动力。

谨以此书献给我最亲爱的家人,以及众多热爱RocketMQ的朋友们!丁威

感谢RocketMQ团队,是你们的付出才有这么好的产品,同时感谢杨福川编辑对本书出版工作的支持。

谨以此书献给我最亲爱的家人和同事,以及帮助过、关注过我的人,以及使用、学习过RocketMQ的朋友们!周继锋  第1章 阅读源代码前的准备

研究一款开源中间件,首先我们需要了解它的整体架构以及如何在开发环境调试源码,从代码入手才能快速熟悉一个开源项目,只有这样才能抽丝剥茧地理解透彻,了解作者的设计思想和实现原理。本章将重点介绍RocketMQ的整体设计理念以及如何调试RocketMQ,为后续源码阅读打下扎实的基础。

本章重点内容如下。

·获取和调试RocketMQ源代码

·RocketMQ源代码的目录结构

·RocketMQ的设计理念和设计目标1.1 获取和调试RocketMQ的源代码

RocketMQ原先是阿里巴巴内部使用的消息中间件,于2017年提交到Apache基金会成为Apache基金会的顶级开源项目,GitHub代码库链接:https://github.com/apache/rocketmq.git。在Github网站上搜索RocketMQ,如图1-1所示。图1-1 GitHub RocketMQ搜索界面1.1.1 Eclipse获取RocketMQ源码

Step1:单击右键从菜单中选择import git,弹出如图1-2所示的对话框。图1-2 Import对话框

Step2:点击Next按钮,弹出Projects from Git对话框,如图1-3所示。图1-3 Import Projects from Git对话框

Step3:点击Next按钮,弹出Clone URI对话框,如图1-4所示。图1-4 Import Projects from Git对话框

Step4:继续点击Next进入下一步,选择代码分支,如图1-5所示。图1-5 Import Projects from Git对话框

Step5:选择所需要的分支后点击Next,进入代码存放目录选择,如图1-6所示。图1-6 Import Projects from Git对话框

Step6:点击Next,Eclipse将从远程仓库下载代码,如图1-7所示。图1-7 Import Projects from Git对话框

Step7:代码下载到指定目录后,默认选择Import existing projects(单分支),这里手动选择Import as general projects(多分支),点击Finish,成功导入,如图1-8所示。图1-8 Import Projects from Git对话框

Step8:代码导入成功后,需要将项目转换成Maven项目,导入成功后的效果图,如图1-9所示。

Step9:单击右键从上下文菜单中选择rocketmq_new(文件下载目录名)→Configure→Convert and Detect Nested Projects转换成Maven项目,如图1-10所示。图1-9 导入项目初始状态图1-10 转换Maven项目

Step10:点击Finish执行Maven项目转换,完成RocketMQ的导入,如图1-11所示。图1-11 转换Maven项目

转换过程中可能会弹出如图1-12所示提示框。图1-12 转换Maven项目

解决办法有三种。

1)修改根pom.xml文件,找到如下条目,加上注释。

代码清单1-1 rocketmq根pom.xml文件

2)注释remoting模块下pom.xml文件中部分代码。

代码清单1-2 rocketmq根pom.xml文件

3)右键一个项目,选择Maven→Update Project,如图1-13所示。图1-13 更新Maven项目1.1.2 Eclipse调试RocketMQ源码

本节将展示在Eclipse中启动NameServer、Broker,并运行消息发送与消息消费示例程序。1.启动NameServer

Step1:展开namesrv模块,右键NamesrvStartup.java,移动到Debug As,选中Debug Configurations,弹出Debug Configurations对话框,如图1-14所示。

Step2:选中Java Application条目并单击右键,选择New弹出Debug Configurations对话框,如图1-15所示。

Step3:设置RocketMQ运行主目录。选择Environment选项卡,添加环境变量ROCKET_HOME。

Step4:在RocketMQ运行主目录中创建conf、logs、store三个文件夹,如图1-16所示。图1-14 选择Debug Configurations图1-15 Debug Configurations,Create,manage,and run configurations图1-16 RocketMQ主目录

Step5:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中,logback_namesrv.xml文件则只需修改日志文件的目录,broker.conf文件内容如下所示。

代码清单1-3 broker.conf文件brokerClusterName=DefaultClusterbrokerName=broker-abrokerId=0#nameServer地址,分号分割 namesrvAddr=127.0.0.1:9876deleteWhen=04fileReservedTime=48brokerRole=ASYNC_MASTERflushDiskType=ASYNC_FLUSH#存储路径 storePathRootDir=D:\\rocketmq\\store#commitLog 存储路径 storePathCommitLog=D:\\rocketmq\\store\\commitlog#消费队列存储路径 storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue#消息索引存储路径 storePathIndex=D:\\rocketmq\\store\\index#checkpoint 文件存储路径 storeCheckpoint=D:\\rocketmq\\store\\checkpoint#abort 文件存储路径abortFile=D:\\rocketmq\\store\\abort

Step6:在Eclipse Debug中运行NamesrvStartup,并输出“The Name Server boot success.Serializetype=JSON”。2.启动Broker

Step1:展开broker模块,右键BrokerStartup.java,移动到Debug As,选中Debug Configurations,弹出如图1-17所示的对话框,选择arguments选项卡,配置-c属性指定broker配置文件路径。图1-17 Debug Configurations,Create,manage,and run configurations

Step2:切换选项卡Environment,配置RocketMQ主目录,如图1-18所示。图1-18 Debug Configurations,Create,manage,and run configurations

Step3:以Debug模式运行BrokerStartup.java,查看${ROCKET_HOME}/logs/broker.log文件,未报错则表示启动成功。

代码清单1-4 broker启动日志截图2018-03-22 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK2018-03-22 20:47:29 INFO main - The broker[broker-a, 192.168.1.3:10911] boot success. serializeType=JSON and name server is 127.0.0.1:98762018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes2018-03-22 20:47:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK2018-03-22 20:48:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes2018-03-22 20:48:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK2018-03-22 20:49:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK3.使用RocketMQ提供的实例验证消息发送与消息消费

Step1:修改org.apache.rocketmq.example.quickstart.Producer示例程序,设置消息生产者NameServer地址。

代码清单1-5 消息发送示例程序public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1; i++) { try { Message msg = new Message("TopicTest"/* Topic */,"TagA"/* Tag */, ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET)/* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }}

Step2:运行该示例程序,查看运行结果,如果输出代码清单1-6所示结果则表示消息发送成功。

代码清单1-6 消息发送结果SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000,offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue[topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]

Step3:修改org.apache.rocketmq.example.quickstart.Consumer示例程序,设置消息消费者NameServer地址。

代码清单1-7 消息消费示例程序public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }}

Step4:运行消息消费者程序,如果输出如下所示则表示消息消费成功。

代码清单1-8 消息消费结果Consumer Started.ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443,bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510,storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000,commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0,preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1521723841419,UNIQ_KEY=C0A8010325B46D06D69C70A211400000, WAIT=true, TAGS=TagA}, body=16]]]

消息发送与消息消费都成功,则说明RocketMQ调试环境已经成功搭建了,可以直接Debug源码,探知RocketMQ的实现奥秘了。1.1.3 IntelliJ IDEA获取RocketMQ源码

Step1:在IntelliJ IDEA VCS菜单中选择Check from Version Control,再选择Git,如图1-19所示。图1-19 Git clone对话框

Step2:在弹出的对话框中,URL输入RocketMQ源码地址,选择保存的本地路径,点击Clone,弹出Git Repository对话框,如图1-20所示。图1-20 Git Repository对话框

状态栏有代码检出的进度,如图1-21所示。图1-21 RocketMQ Clon进度条

Step3:检出完成会弹出提示框,选择Yes,如图1-22所示。图1-22 Checkout From Version Control

Step4:在弹出框中选择Maven,点击Next,如图1-23所示。图1-23 Import Project

Step5:勾选jdk8,点击Next,如图1-24所示。

Step6:下面2步都直接点击Next,如图1-25、图1-26和图1-27所示。图1-24 Import Project select profiles图1-25 Select maven projects to import图1-26 Create a new Project图1-27 Import RocketMQ

Step7:导入成功后,效果图如图1-28所示。图1-28 RocketMQ项目结构

Step8:执行maven命令clean install,进行编译和下载依赖,下载完成后,可以看到控制台BUILD SUCCESS的提示信息,如图1-29所示。图1-29 mvn clean install RocketMQ

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载