ApacheKafka源码剖析(txt+pdf+epub+mobi电子书下载)


发布时间:2020-08-19 08:16:18

点击下载

作者:徐郡明

出版社:电子工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

ApacheKafka源码剖析

ApacheKafka源码剖析试读:

前言

这是一个数据大爆炸的时代,互联网成为了数据传播的主要载体。大数据处理平台在现代化的互联网公司进行商业决策、规划发展、市场拓展等方面扮演着越来越重要的角色。Kafka作为大数据平台的重要组件之一,受到越来越多的设计人员和开发人员的青睐,Kafka的社区也变得越来越活跃,Kafka本身的架构设计、应用场景也得到了长足的发展。

Kafka最开始由LinkedIn设计开发,并于2011年年初开源,2012年10月成为Apache基金会的顶级项目。目前Kafka为越来越多的分布式大数据处理系统提供支持,其中也包括著名的Apache Spark,LinkedIn、Netflix、Uber、Verizon、网易、美团等互联网公司也选择以Kafka为基础搭建其大数据处理平台或消息中间件系统。随着Kafka的应用场景越来越丰富,用户对Kafka的吞吐量、可扩展性、稳定性和可维护性等有了更多的期许,也有很多开发人员参与到Kafka的开发建议制定和代码提交中。在Kafka 0.10.X版本中出现了很多令人欣喜的新功能,本书深入剖析了Kafka 0.10.X的内部设计和实现细节。

本书以Kafka 0.10.0版本源码为基础,深入剖析了Kafka的各个模块的实现,包括Kafka的生产者客户端、消费者客户端、服务端的各个模块以及常用的管理脚本。笔者对Kafka设计的理解和经验分享也穿插在了剖析源码的过程中,希望读者能够通过本书理解Kafka的设计原理和源码实现,同时也学习到Kafka中优秀的设计思想以及Java和Scala的编程技巧和规范。如何阅读本书

由于本书的篇幅限制,本书并没有详细介绍Kafka源码中涉及的所有基础知识,例如Java NIO、J.U.C包中工具类的使用、命令行参数解析器的使用等,为方便读者阅读,笔者仅介绍了一些必须且重要的基础知识。在开始源码分析之前,希望读者按照第1章的相关介绍完成Kafka源码环境的搭建,并了解Kafka的核心概念,这样也可以有更好的学习效果。

本书共五章,它们互相之间的联系并不是很强,读者可以从头开始阅读,也可以选择自己感兴趣的章节进行学习。

第1章是Kafka的快速入门,其中介绍了Kafka的背景、特性以及应用场景。之后介绍了笔者在实践中遇到的一个以Kafka为中心的案例,并分析了在此案例中选择使用Kafka的具体原因和Kafka起到的关键作用。最后介绍了Kafka中的核心概念和Kafka源码调试环境的搭建。

第2章介绍了生产者客户端的设计特点和实现细节,剖析了KafkaProducer拦截消息、序列化消息、路由消息等功能的源码实现,介绍了RecordAccumulator的结构和实现。最后剖析了KafkaProducer中Sender线程的源码。

第3章介绍了Kafka的消息传递保证语义并给出了相关的实践建议,还介绍了Consumer Group Rebalance操作各个版本方案的原理和弊端。最后详细剖析了KafkaConsumer相关组件的运行原理和实现细节。

第4章介绍了构成Kafka服务端的各个组件,依次分析了Kafka网络层、API层、日志存储、DelayedOperationPurgatory组件、Kafka的副本机制、KafkaController、GroupCoordinator、Kafka的身份认证与权限控制以及Kafka监控相关的实现。本章是Kafka的核心内容,涉及较多的设计细节和编程技巧,希望读者阅读之后有所收获。

第5章介绍了Kafka提供的多个脚本工具的使用以及具体实现原理,了解这些脚本可以帮助管理人员快速完成一些常见的管理、运维、测试功能。

如果读者在阅读本书的过程中,发现任何不妥之处,请将您宝贵的意见和建议发送到邮箱xxxlxy2008@163.com,也欢迎读者朋友通过此邮箱与笔者进行交流。致谢

感谢电子工业出版社博文视点的陈晓猛老师,是您的辛勤工作让本书的出版成为可能。同时还要感谢许多我不知道名字的幕后工作人员为本书付出的努力。

感谢张占龙、张亚森、杨威、刘克刚、刘思等朋友在百忙之中抽出时间对本书进行审阅和推荐。感谢林放、米秀明、星亮亮、王松洋、褚洪洋、曾天宁、葛彬、赵美凯、顾聪慧、孙向川、段鑫冬、彭海蛟、赵仁伟等同事,帮助我解决工作中的困难。

感谢冯玉玉、李成伟,是你们让写作的过程变得妙趣横生,是你们让我更加积极、自信,也是你们的鼓励让我完成了本书的写作。

最后,特别感谢我的母亲大人,谢谢您默默为我做出的牺牲和付出,您是我永远的女神。徐郡明读者服务

轻松注册成为博文视点社区用户(www.broadview.com.cn),您即可享受以下服务:● 提交勘误:您对书中内容的修改意见可在【提交勘误】处提交,

若被采纳,将获赠博文视点社区积分(在您购买电子书时,积分

可用来抵扣相应金额)。● 与作者交流:在页面下方【读者评论】处留下您的疑问或观点,

与作者和其他读者一同学习交流。

页面入口:http://www.broadview.com.cn/31345

二维码:专家推荐《Apache Kafka源码剖析》一书深入浅出地分析了Kafka的源代码,无论是刚接触Kafka的菜鸟,还是已经有多年Kafka使用经验的老鸟,这本书都能让你有所收获。——搜狗高级研发工程师 张亚森

Kafka是大数据平台中的关键部分之一。《Apache Kafka源码剖析》全面细致地剖析了Kafka的运行原理和架构设计,在带领读者进入Kafka源码世界的同时,也分析了许多设计经验,是一本不可多得的好书。——华为高级研发工程师 张占龙

在阅读《Apache Kafka源码剖析》时,作者在每一章节中都会给我意外之惊喜。作者对Kafka源代码已有相当深刻的理解,此书代码分析过程逻辑清晰,详略得当,实属不易。——网易游戏高级数据挖掘研究员 杨威

大型分布式系统犹如一个生命,系统中各个服务犹如骨骼,其中的数据犹如血液,而Kafka犹如经络,串联整个系统。《Apache Kafka源码剖析》通过大量的设计图展示、代码分析、示例分享,把Kafka的实现脉络展示在读者面前,帮助读者更好地研读Kafka代码。——今日头条高级研发工程师 刘克刚《Apache Kafka源码剖析》中汇集了作者多年Kafka开发经验,为读者深入学习Kafka实现指明了方向。对于想学习Kafka的程序员来说,这是一本非常不错的进阶书籍。——美团高级研发工程师 刘思第1章快速入门1.1 Kafka简介

Apache Kafka是一种分布式的、基于发布/订阅的消息系统,由Scala语言编写而成。它具备快速、可扩展、可持久化的特点。Kafka最初由LinkedIn开发,并于2011年初开源,2012年10月从Apache孵化器毕业,成为Apache基金会的顶级项目。目前,越来越多的开源分布式处理系统支持与Kafka集成,例如:Apache Storm、Spark。也有越来越多的公司在Kafka的基础上建立了近乎实时的信息处理平台,例如:LinkedIn、Netflix、Uber和Verizon。在国内也有很多互联网公司在生产环境中使用Kafka作为其消息中间件。

Kafka之所以受到越来越多开发人员的青睐,主要与其关键特性相关。● Kafka具有近乎实时性的消息处理能力,即使面对海量消息也能

够高效地存储消息和查询消息。Kafka将消息保存在磁盘中,在

其设计理念中并不惧怕磁盘操作,它以顺序读写的方式访问磁

盘,从而避免了随机读写磁盘导致的性能瓶颈。● Kafka支持批量读写消息,并且会对消息进行批量压缩,这样既

提高了网络的利用率,也提高了压缩效率。● Kafka支持消息分区,每个分区中的消息保证顺序传输,而分区

之间则可以并发操作,这样就提高了Kafka的并发能力。● Kafka也支持在线增加分区,支持在线水平扩展。● Kafka支持为每个分区创建多个副本,其中只会有一个Leader副

本负责读写,其他副本只负责与Leader副本进行同步,这种方式

提高了数据的容灾能力。Kafka会将Leader副本均匀地分布在集

群中的服务器上,实现性能最大化。

随着Kafka在各大公司的实践应用,Kafka的应用场景也变得越来越丰富。● 在应用系统中可以将Kafka作为传统的消息中间件,实现消息队

列和消息的发布/订阅,在某些场景下,性能会超越RabbitMQ、

ActiveMQ等传统的消息中间件。● Kafka也被用作系统中的数据总线,将其接入多个子系统中,子

系统会将产生的数据发送到Kafka中保存,之后流转到目的系统

中。● Kafka还可以用作日志收集中心,多个系统产生的日志统一收集

到Kafka中,然后由数据分析平台进行统一处理。日志会被Kafka

持久化到磁盘,所以同时支持离线数据处理和实时数据处理。● 现在也有开发人员基于Kafka设计数据库主从同步的工具。1.2 以Kafka为中心的解决方案

在大数据、高并发的系统中,为了突破瓶颈,会将系统进行水平扩展和垂直拆分,形成独立的服务。每个独立的服务背后,可能是一个集群在对外提供服务。这就会碰到一个问题,整个系统是由多个服务(子系统)组成的,数据需要在各个服务中不停流转。如果数据在各个子系统中传输时,速度过慢,就会形成瓶颈,降低整个系统的性能。

下面介绍的场景是笔者工作中遇到的一个案例,在一个政企信息化的云平台网站上,用户与网站交互的很多操作行为(例如,浏览某些新闻等)都会被记录下来,等待后台的多个子系统进行消费,其中比较重要的几个子系统是利用这些数据进行机器学习或是数据挖掘,产生用户的侧写。这样,网站就可以根据用户的侧写,推送给他们需要的配置和查询信息。图1-1就是这个云平台系统的架构图,其中每一个箭头都表示一条数据流。图1-1

上面的架构中涉及的子系统、存储、服务种类繁多,而且它们之间都存在较强的耦合,会出现下面的问题:● 由于子系统之间存在的耦合性,两个存储之间要进行数据交换的

话,开发人员就必须了解这两个存储系统的API,不仅是开发成

本,就连维护成本也会很高。一旦其中一个子系统发生变化,就

可能影响其他多个子系统,这简直就是一场灾难。● 在某些应用场景中,数据的顺序性尤为重要,一旦数据出现乱序,

就会影响最终的计算结果,降低用户体验,这就提高了开发的难

度。● 除了考虑数据顺序性的要求,还要考虑数据重传等提高可靠性的

机制,毕竟通过网络进行传输并不可靠,可能出现丢失数据的情

况。● 进行数据交换的两个子系统,无论哪一方宕机,重新上线之后,

都应该恢复到之前的传输位置,继续传输。尤其是对于非幂等性

的操作,恢复到错误的传输位置,就会导致错误的结果。● 随着业务量的增长,系统之间交换的数据量会不断地增长,水平

可扩展的数据传输方式就显得尤为重要。

针对这个案例,我们看看Kafka是如何有效地解决上面的这些问题的(Kafka中的相关概念可以参见下文相关内容)。

• 解耦合

将Kafka作为整个系统的中枢,负责在任意两个系统之间传递数据。架构如图1-2所示,所有的存储只与Kafka通信,开发人员不需要再去了解各个子系统、服务、存储的相关接口,只需要面向Kafka编程即可。这样,在需要进行数据交换的子系统之间形成了一个基于数据的接口层,只有这两者知道消息存放的Topic、消息中数据的格式。当需要扩展消息格式时,只需要修改相关子系统的Kafka客户端即可。这样,与Kafka通信的模块就可以实现复用,Kafka则承担数据总线的作用。更简单点说,就像是一个生产者—消费者模式,而Kafka则扮演其中“队列”的角色。图1-2

• 数据持久化

在分布式系统中,各个组件是通过网路连接起来的。一般认为网络传输是不可靠的,当数据在两个组件之间进行传递的时候,传输过程可能会失败。除非数据被持久化到磁盘,否则就可能造成消息的丢失。Kafka把数据以消息的形式持久化到磁盘,即使Kafka出现宕机,也可以保证数据不会丢失,通过这一方式规避了数据丢失风险。为了避免磁盘上的数据不断增长,Kafka提供了日志清理、日志压缩等功能,对过时的、已经处理完成的数据进行清除。在磁盘操作中,耗时最长的就是寻道时间,这是导致磁盘的随机I/O性能很差的主要原因。为了提高消息持久化的性能,Kafka采用顺序读写的方式访问,实现了高吞吐量。

• 扩展与容灾

Kafka的每个Topic(主题)都可以分为多个Partition(分区),每个分区都有多个Replica(副本),实现消息冗余备份。每个分区中的消息是不同的,这类似于数据库中水平切分的思想,提高了并发读写的能力。而同一分区的不同副本中保存的是相同的消息,副本之间是一主多从的关系,其中Leader副本负责处理读写请求,Follower副本则只与Leader副本进行消息同步,当Leader副本出现故障时,则从Follower副本中重新选举Leader副本对外提供服务。这样,通过提高分区的数量,就可以实现水平扩展;通过提高副本的数量,就可以提高容灾能力。

Kafka的容灾能力不仅体现在服务端,在Consumer端也有相关设计。Consumer使用pull方式从服务端拉取消息,并且在Consumer端保存消费的具体位置,当消费者宕机后恢复上线,可以根据自己保存的消费位置重新拉取需要的消息进行消费,这就不会造成消息丢失。也就是说,Kafka不决定何时、如何消费消息,而是Consumer自己决定何时、如何消费消息。

Kafka还支持Consumer的水平扩展能力。我们可以让多个Consumer加入一个Consumer Group(消费组),在一个Consumer Group中,每个分区只能分配给一个Consumer消费,当Kafka服务端通过增加分区数量进行水平扩展后,我们可以向Consumer Group中增加新的Consumer来提高整个Consumer Group的消费能力。当Consumer Group中的一个Consumer出现故障下线时,会通过Rebalance操作将下线Consumer负责处理的分区分配给其他Consumer继续处理;当下线Consumer重新上线加入Consumer Group时,会再进行一次Rebalance操作,重新分配分区。当然,一个COnsumer Group可以订阅很多不同的Topic,每个Consumer可以同时处理多个分区。

• 顺序保证

在很多场景下,数据处理的顺序都很重要,不同的顺序就可能导致不同的计算结果。Kafka保证一个Partition内消息的有序性,但是并不保证多个partition之间的数据有顺序。

• 缓冲&峰值处理能力

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如图1-3所示,在9点到10点之间,是此云平台系统的访问峰值,而其他时间的访问量则很少。图1-3

如果按照处理这类峰值请求量为标准来投入资源的话,会有相当一部分资源处于待命状态,这无疑是巨大的浪费。使用Kafka能够使关键组件顶住突发的访问压力,而不会因为突发的峰值请求而使系统完全崩溃不可用。

• 异步通信

Kafka为系统提供了异步处理能力。例如,两个系统需要通过网络进行数据交换,其中一端可以把一个消息放入Kafka中后立即返回继续执行其他路基,不需要等待对端的响应。待后者将处理结果放入Kafka中之后,前者可以从其中获取并解析响应。1.3 Kafka核心概念

如果读者已经对下面的概念非常熟悉,可以快速阅读或直接跳过本节,直接开始搭建Kafka的源码调试环境。

• 消息

消息是Kafka中最基本的数据单元。消息由一串字节构成,其中主要由key和value构成,key和value也都是byte数组。key的主要作用是根据一定的策略,将此消息路由到指定的分区中,这样就可以保证包含同一key的消息全部写入同一分区中,key可以是null。消息的真正有效负载是value部分的数据。为了提高网络和存储的利用率,生产者会批量发送消息到Kafka,并在发送之前对消息进行压缩,具体的细节在本书后面的章节会详细介绍。

• Topic&分区&Log

Topic是用于存储消息的逻辑概念,可以看作一个消息集合。每个Topic可以有多个生产者向其中推送(push)消息,也可以有任意多个消费者消费其中的消息,如图1-4所示。图1-4

每个Topic可以划分成多个分区(每个Topic都至少有一个分区),同一Topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset,它是消息在此分区中的唯一编号,Kafka通过offset保证消息在分区内的顺序,offset的顺序性不跨分区,即Kafka只保证在同一个分区内的消息是有序的;同一Topic的多个分区内的消息,Kafka并不保证其顺序性,如图1-5所示。图1-5

同一Topic的不同分区会分配在不同的Broker(Broker的概念见下文)上。分区是Kafka水平扩展性的基础,我们可以通过增加服务器并在其上分配Partition的方式来增加Kafka的并行处理能力。

分区在逻辑上对应着一个Log,当生产者将消息写入分区时,实际上是写入到了分区对应的Log中。Log是一个逻辑概念,可以对应到磁盘上的一个文件夹。Log由多个Segment组成,每个Segment对应一个日志文件和索引文件。在面对海量数据时,为避免出现超大文件,每个日志文件的大小是有限制的,当超出限制后则会创建新的Segment,继续对外提供服务。这里要注意,因为Kafka采用顺序I/O,所以只向最新的Segment追加数据。为了权衡文件大小、索引速度、占用内存大小等多方面因素,索引文件采用稀疏索引的方式,大小并不会很大,在运行时会将其内容映射到内存,提高索引速度。

• 保留策略(Retention Policy)&日志压缩(Log Compaction)

无论消费者是否已经消费了消息,Kafka都会一直保存这些消息,但并不会像数据库那样长期保存。为了避免磁盘被占满,Kafka会配置相应的“保留策略”(retention policy),以实现周期性地删除陈旧的消息。

Kafka中有两种“保留策略”:一种是根据消息保留的时间,当消息在Kafka中保存的时间超过了指定时间,就可以被删除;另一种是根据Topic存储的数据大小,当Topic所占的日志文件大小大于一个阈值,则可以开始删除最旧的消息。Kafka会启动一个后台线程,定期检查是否存在可以删除的消息。“保留策略”的配置是非常灵活的,可以有全局的配置,也可以针对Topic进行配置覆盖全局配置。

除此之外,Kafka还会进行“日志压缩”(Log Compaction)。在很多场景中,消息的key与value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新value值。此时,可以开启Kafka的日志压缩功能,Kafka会在后台启动一个线程,定期将相同key的消息进行合并,只保留最新的value值。日志压缩的工作原理如图1-6所示,图1-6展示了一次日志压缩过程的简化版本,为了图片清晰,只展示了key3的压缩过程。图1-6

保留策略和日志压缩的相关配置和实现细节,在本书后面的章节会详细介绍。

• Broker

一个单独的Kafka server就是一个Broker。Broker的主要工作就是接收生产者发过来的消息,分配offset,之后保存到磁盘中;同时,接收消费者、其他Broker的请求,根据请求类型进行相应处理并返回响应。在一般的生产环境中,一个Broker独占一台物理服务器。

• 副本

Kafka对消息进行了冗余备份,每个Partition可以有多个副本,每个副本中包含的消息是一样的(在同一时刻,副本之间其实并不是完全一样的,本书后面在介绍副本机制的时候会再进行说明)。每个分区至少有一个副本,当分区中只有一个副本时,就只有Leader副本,没有Follower副本。

每个分区的副本集合中,都会选举出一个副本作为Leader副本,Kafka在不同的场景下会采用不同的选举策略,具体策略和原理在后面详细介绍。所有的读写请求都由选举出的Leader副本处理,其他都作为Follower副本,Follower副本仅仅是从Leader 副本处把数据拉取到本地之后,同步更新到自己的Log中。图1-7展示了一个拥有三个Replica的Partition。图1-7

一般情况下,同一分区的多个分区会被分配到不同的Broker上,这样,当Leader所在的Broker宕机之后,可以重新选举新的Leader,继续对外提供服务。

• ISR集合

ISR(In-Sync Replica)集合表示的是目前“可用”(alive)且消息量与Leader相差不多的副本集合,这是整个副本集合的一个子集。“可用”和“相差不多”都是很模糊的描述,其实际含义是ISR集合中的副本必须满足下面两个条件:(1)副本所在节点必须维持着与ZooKeeper的连接。(2)副本最后一条消息的offset与Leader副本的最后一条消息的offset之间的差值不能超出指定的阈值。

每个分区中的Leader副本都会维护此分区的ISR集合。写请求首先由Leader副本处理,之后Follower副本会从Leader上拉取写入的消息,这个过程会有一定的延迟,导致Follower副本中保存的消息略少于Leader副本,只要未超出阈值都是可以容忍的。如果一个Follower副本出现异常,比如:宕机,发生长时间GC而导致Kafka僵死或是网络断开连接导致长时间没有拉取消息进行同步,就会违反上面的两个条件,从而被Leader副本踢出ISR集合。当Follower副本从异常中恢复之后,会继续与Leader副本进行同步,当Follower副本“追上”(即最后一条消息的offset的差值小于指定阈值)Leader副本的时候,此Follower副本会被Leader副本重新加入到ISR中。

• HW&LEO

HW(HighWatermark)和LEO与上面的ISR集合紧密相关。HW标记了一个特殊的offset,当消费者处理消息的时候,只能拉取到HW之前的消息,HW之后的消息对消费者来说是不可见的。与ISR集合类似,HW也是由Leader副本管理的。当ISR集合中全部的Follower副本都拉取HW指定消息进行同步后,Leader副本会递增HW的值。Kafka官方网站将HW之前的消息的状态称为“commit”,其含义是这些消息在多个副本中同时存在,即使此时Leader副本损坏,也不会出现数据丢失。

LEO(Log End Offset)是所有的副本都会有的一个offset标记,它指向追加到当前副本的最后一个消息的offset。当生产者向Leader副本追加消息的时候,Leader副本的LEO标记会递增;当Follower副本成功从Leader副本拉取消息并更新到本地的时候,Follower副本的LEO就会增加。

为了让读者更好地理解HW和LEO之间的关系,下面通过一个示例进行分析,图1-8中展示了针对offset为11的消息,ISR集合、HW与LEO是如何协调工作的:图1-8

①Producer向此Partition推送消息。

②Leader副本将消息追加到Log中,并递增其LEO。

③Follower副本从Leader副本拉取消息进行同步。

④Follower副本将拉取到的消息更新到本地Log中,并递增其LEO。

⑤当ISR集合中所有副本都完成了对offset=11的消息的同步,Leader副本会递增HW。

在①~⑤步完成之后,offset=11的消息就对生产者可见了。

了解了Replica复制的原理之后,请读者考虑一下,为什么Kafka要这么设计?在分布式存储中,冗余备份是常见的一种设计,常用的方案有同步复制和异步复制:● 同步复制要求所有能工作的Follower副本都复制完,这条消息才

会被认为提交成功。一旦有一个Follower副本出现故障,就会导

致HW无法完成递增,消息就无法提交,生产者获取不到消息。

这种情况下,故障的Follower副本会拖慢整个系统的性能,甚至

导致整个系统不可用。● 异步复制中,Leader副本收到生产者推送的消息后,就认为此消

息提交成功。Follower副本则异步地从Leader副本同步消息。这

种设计虽然避免了同步复制的问题,但同样也存在一定的风险。

现在假设所有Follower副本的同步速度都比较慢,它们保存的消

息量都远远落后于Leader副本,如图1-9所示。图1-9

此时Leader副本所在的Broker突然宕机,则会重新选举新的Leader副本,而新Leader副本中没有原来Leader副本的消息,这就出现了消息的丢失,而有些消费者则可能消费了这些丢失的消息,状态变得不可控。

Kafka权衡了同步复制和异步复制两种策略,通过引入了ISR集合,巧妙地解决了上面两种方案存在的缺陷:当Follower副本的延迟过高时,Leader副本被踢出ISR集合,消息依然可以快速提交,生产者可以快速得到响应,避免高延时的Follower副本影响整个Kafka集群的性能。当Leader副本所在的Broker突然宕机的时候,会优先将ISR集合中Follower副本选举为Leader副本,新Leader副本中包含了HW之前的全部消息,这就避免了消息的丢失。值得注意是,Follower副本可以批量地从Leader副本复制消息,这就加快了网络I/O,Follower 副本在更新消息时是批量写磁盘,加速了磁盘的I/O,极大减少了Follower与Leader的差距。

• Cluster&Controller

多个Broker可以做成一个Cluster(集群)对外提供服务,每个Cluster当中会选举出一个Broker来担任Controller,Controller是Kafka集群的指挥中心,而其他Broker则听从Controller指挥实现相应的功能。Controller负责管理分区的状状态、管理每个分区的副本状态、监听Zookeeper中数据的变化等工作。Controller也是一主多从的实现,所有Broker都会监听Controller Leader的状态,当Leader Controller出现故障时则重新选举新的Controller Leader。Controller的具体细节在后面介绍。

• 生产者

生产者(Producer)的主要工作是生产消息,并将消息按照一定的规则推送到Topic的分区中。这里选择分区的“规则”可以有很多种,例如:根据消息的key的Hash值选择分区,或按序轮训全部分区的方式。

• 消费者

消费者(Consumer)的主要工作是从Topic中拉取消息,并对消息进行消费。某个消费者消费到Partition的哪个位置(offset)的相关信息,是Consumer自己维护的。在图1-10中,三个消费者同时消费同一个分区,各自管理自己的消费位置。图1-10

这样设计非常巧妙,避免了Kafka Server端维护消费者消费位置的开销,尤其是在消费数量较多的情况下。另一方面,如果是由Kafka Server端管理每个Consumer消费状态,一旦Kafka Server端出现延时或是消费状态丢失,将会影响大量的Consumer。同时,这一设计也提高了Consumer的灵活性,Consumer可以按照自己需要的顺序和模式拉取消息进行消费。例如:Consumer可以通过修改其消费的位置实现针对某些特殊key的消息进行反复消费,或是跳过某些消息的需求。

• Consumer Group

在Kafka中,多个Consumer可以组成一个Consumer Group,一个Consumer只能属于一个Consumer Group。Consumer Group保证其订阅的Topic的每个分区只被分配给此Consumer Group中的一个消费者处理。如果不同Consumer Group订阅了同一Topic,Consumer Group彼此之间不会干扰。这样,如果要实现一个消息可以被多个消费者同时消费(“广播”)的效果,则将每个消费者放入单独的一个Consumer Group;如果要实现一个消息只被一个消费者消费(“独占”)的效果,则将所有的Consumer放入一个Consumer Group中。在Kafka官网的介绍中,将Consumer Group称为“逻辑上的订阅者”(logical subscriber),从这个角度看,是有一定道理的。

图1-11展示了一个Consumer Group中消费者与分区之间的对应关系,其中,Consumer1和Consumer2分别消费Partition0和Partition1,而Partition2和Partition3同时分配给了Consumer3进行处理。图1-11

Consumer Group除了实现“独占”和“广播”模式的消息处理,Kafka还通过Consumer Group实现了消费者的水平扩展和故障转移。在图1-11中,当Consumer3的处理能力不足以处理两个Partition中的数据时,可以通过向Consumer Group中添加消费者的方式,触发Rebalance操作重新分配分区与消费者的对应关系,从而实现水平扩展。如图1-12所示,添加Consumer4之后,Consumer3只消费Partition3中的消息,Partition4中的消息则由Consumer4来消费。图1-12

下面来看消费者出现故障的场景,当Consumer4宕机时,Consumer Group会自动重新分配分区,如图1-13所示,由Consumer3接管Consumer4对应的分区继续处理。图1-13

注意,Consumer Group中消费者的数量并不是越多越好,当其中消费者数量超过分区的数量时,会导致有消费者分配不到分区,从而造成消费者的浪费。

介绍完Kafka的核心概念,我们通过图1-14进行总结,并从更高的视角审视整个Kafka集群的架构。图1-14

如图1-14所示,生产者会根据业务逻辑产生消息,之后根据路由规则将消息发送到指定分区的Leader副本所在的Broker上。在Kafka服务端接收到消息后,会将消息追加到Log中保存,之后Follower副本会与Leader副本进行同步,当ISR集合中所有副本都完成了此消息的同步后,则Leader副本的HW会增加,并向生产者返回响应。

当消费者加入到Consumer Group时,会触发Rebalance操作将分区分配给不同的消费者消费。随后,消费者会恢复其消费位置,并向Kafka服务端发送拉取消息的请求,Leader副本会验证请求的offset以及其他相关信息,最后返回消息。1.4 搭建Kafka源码环境

在开始分析Kafka的源码之前,我们先要动手搭建Kafka源码的调试环境。需要准备的软件有:Java JDK、Scala-2.10、gradle-3.1、ZooKeeper-3.4.9。本书选择的Kafka版本是kafka-0.10.0.1,IDE环境是IntelliJ IDEA。

• 安装配置JDK

本书是在Window系统上搭建Kafka的源码环境,使用的JDK版本是1.8.0,环境变量配置如图1-15所示。JDK安装以及环境变量的配置不再赘述,请读者自行查阅相关文档。图1-15

使用java-version命令验证,输出如图1-16所示。图1-16

• 安装配置Scala

Kafka服务端使用Scala语言编写,本书使用的Scala-2.10版本,Scala的环境变量配置如图1-17所示。图1-17

使用scala –version命令验证安装是否正确,输出如图1-18所示。图1-18

• 安装配置Gradle

使用Gradle进行源码构建,Gradle是一个基于Apache Ant和Apache Maven概念的项目自动化建构工具,本书使用Gradle 3.1版本。从Gradle官网下载gradle-3.1-bin.zip,解压缩到指定地点,配置GRADLE_HOME以及Path环境变量,如图1-19所示。图1-19

配置完成后,使用gradle-version命令进行验证,输出如图1-20所示。图1-20

• 搭建ZooKeeper环境

ZooKeeper是一个分布式的分布式应用程序协调服务,很多分布式系统都依赖与ZooKeeper集群实现分布式系统间的协调调度,例如:HDFS 2.x、Hbase、Kafka以及新兴的数据库中间件MyCat,ZooKeeper已经成为现代分布式系统的标配。通过ZooKeeper可以实现很多高级功能,例如:统一命名服务、统一配置管理、分布式锁、分布式队列等。

Kafka使用ZooKeeper集群管理元数据,例如:记录Topic名称、分区以及其副本分配等信息,用户权限控制的相关数据等。Kafka还会在ZooKeeper的某些上添加相应的监听器,用于监听集群的状态,例如:集群中所有Broker通过ZooKeeper监听Controller Leader的状态。我们也可以通过Kafka自带的脚本修改ZooKeeper触发相应操作,例如:“优先副本”选举。

下面我们开始搭建ZooKeeper的环境。从ZooKeeper的官网下载其二进制压缩包,之后解压缩,本书使用的版本是3.4.9,单机模式(毕竟重点是Kafka,而且读者的个人计算机资源有限,若使用集群模式,请读者自行参考相关资料)。

解压后,将%ZOOKEEPER%/conf/zoo_sample.cfg文件复制一份,重命名为zoo.cfg。修改zoo.cfg配置文件,这里主要就是修改dataDir配置项,此配置指向ZooKeeper存储数据的目录,其他配置可以不进行修改,如图1-21所示。图1-21

由于Kafka使用依赖于ZooKeeper服务,在启动Kafka之前,需要先使用zkServer命令,启动上文已配置好的ZooKeeper服务,如图1-22所示。图1-22

Kafka源码构建

本书以kafka-0.10.0.1版本为基础进行源码分析。首先,从Apache Kafka官网下载其源码包kafka-0.10.0.1-src.tgz,解压。使用命令行导航到Kafka源代码根目录下,使用gradle idea命令进行构建(如果读者希望构建Eclipse工程,请使用gradle eclipse命令构建),构建过程中会从网上下载各种依赖包,时间会有点长,请耐心等待。最终输出BUILD SUCCESSFUL字样,表示构建成功,如图1-23所示。图1-23

• 安装Scala插件

本书使用IntelliJ IDEA进行源码分析,并安装了其对应的Scala插件。本书使用的Scala插件版本是scala-intellij-bin-2016.2.1.zip,请读者选择合适的插件版本。建议使用离线安装方式,安装方法如图1-24所示。图1-24

• 配置、启动Kafka

在Kafka服务端使用log4j输出日志,启动前需要把log4j.properties配置文件放置到src/main/scala路径下,然后运行程序,这样才能正确输出日志信息。此log4j.properties文件可以从config目录中获取,如图1-25所示。图1-25

server.properties是Kafka的主要配置文件,下面简单介绍其中的相关配置项的含义。必须修改的配置项就是log.dirs,其他配置读者可以根据需求自行修改。 # 每一个Broker在集群中的唯一标识。即使Broker的IP地址发生了变化,broker.id只要没变, # 则不会影响consumers的消息情况 broker.id=0 # 是否允许Topic被删除。如果是false,使用管理员工具删除Topic的时候,Kafka并不会处 # 理此操作 # delete.topic.enable=true # Kafka服务端是否可以根据请求自动创建Topic,默认是true。如果打开此选项,下面三 # 种请求会触发Topic自动创建: # ①Producer向某个不存在的Topic写入消息 # ②Consumer从某个不存在的Topic读取消息 # ③Consumer从某个不存在的Topic读取消息 # 建议将此选项设置为false,并在使用Topic之前手动创建 # auto.create.topics.enable=true #################### 下面是服务端网络相关的配置 #################### # Kafka Server使用的协议、主机名以及端口的格式如下: # listeners = security_protocol://host_name:port # 参考示例: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 #这是默认配置,使用PLAINTEXT,端口是9092 # 接收请求的线程数 num.network.threads=3 # 执行请求的线程数 num.io.threads=8 # 在介绍下面两个缓冲区设置之前,先来介绍一下相关背景知识: # 每个TCP socket在内核中都有一个发送缓冲区(SO_SNDBUF)和一个接收缓冲区 #(SO_RCVBUF)。接收缓冲区把数据缓存入内核,应用进程一直没有调用read进行读取的话, # 此数据会一直缓存在相应socket的接收缓冲区内。再啰嗦一点,不管进程是否读取socket, # 对端发来的数据都会经由内核接收并且缓存到socket的内核接收缓冲区之中。read所做的 # 工作,就是把内核缓冲区中的数据复制到应用层用户的buffer里面,仅此而已。进程调用 # send发送的数据的时候,一般情况下,将数据复制进入socket的内核发送缓冲区之中,然 # 后send便会在上层返回。换句话说,send返回之时,数据不一定会发送到对端去,send # 仅仅是把应用层buffer的数据复制进socket的内核发送buffer中 # TCP连接的SO_SNDBUF缓冲区大小,默认102400,单位是字节 # 如果是-1,就使用操作系统的默认值 socket.send.buffer.bytes=102400 # TCP连接的SO_RCVBUF缓冲区大小,默认102400,单位是字节 # 如果是-1,就使用操作系统的默认值 socket.receive.buffer.bytes=102400 # 请求的最大长度 socket.request.max.bytes=104857600 ####################### 下面是存储log相关的配置 ####################### # 用于存储log文件的目录,可以将多个目录通过逗号分隔,形成一个目录列表 log.dirs=/tmp/kafka-logs # 每个Topic默认的partition数量,默认值是1 num.partitions=1 # 用来恢复log文件以及关闭时将log数据刷新到磁盘的线程数量,每个目录对应 # num.recovery.threads.per.data.dir个线程 num.recovery.threads.per.data.dir=1 ################# 下面是log文件刷盘的相关配置 ################# # 每隔多少个消息触发一次flush操作,将内存中的消息刷新到硬盘上 #log.flush.interval.messages=10000 # 每隔多少毫秒触发一次flush操作,将内存中的消息刷新到硬盘上 #log.flush.interval.ms=1000 # 上面这两个配置是全局的,可以在Topic中重新设置,并覆盖这两个配置 ################# 下面是log相关的“保存策略”的配置 ##################### # 注意:下面有两种配置,一种是基于时间的策略,另一种是基于日志文件大小的策略,两种 # 策略同是配置的话,只要满足其中一种策略,则触发Log删除的操作。删除操作总是先删除 # 最旧的日志 # 消息在Kafka中保存的时间,168小时之前的log,可以被删除掉 log.retention.hours=168 # 当剩余空间低于log.retention.bytes字节,则开始删除log #log.retention.bytes=1073741824 # segment日志文件大小的上限值。当超过这个值时,会创建新的segment日志文件 # segment文件的相关信息在后面介绍 log.segment.bytes=1073741824 # 每隔300000ms,logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除 log.retention.check.interval.ms=300000 ################# ZooKeeper的相关配置 ################## # Kafka依赖的ZooKeeper集群地址,可以配置多个ZooKeeper地址,使用逗号隔开 zookeeper.connect=localhost:2181 # ZooKeeper连接的超时时间 zookeeper.connection.timeout.ms=6000

• 配合Kafka的启动参数

Kafka服务端的入口类是kafka.Kafka。除了指定入口类,还需要指定server.properties配置文件所在的位置,即在Kafka源代码的config目录下,如图1-26所示。图1-26

第一次启动时,Intellij IDEA会重新编译整个项目,编译完成后启动。得到的日志如图1-27所示。图1-27

• 验证

为了验证上文配置的源码环境是否成功,可以使用Kafka二进制包中自带的三个脚本进行验证,分别是:kafka-topics用于创建Topic;kafka-console-producer是一个命令行Producer;kafka-console-consumer是一个命令行Consumer。

首先,使用kafka-topics.bat创建一个示例Topic——test,其中partition和replication-factor都为1,输出如图1-28所示。图1-28

之后,启动两个命令行窗口,分别执行下面两行命名,一个作为生产者,另一个作为消费者: kafka-console-producer.bat --broker-list localhost:9092 --topic test kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

启动完成后,得到如图1-29所示的两个窗口。图1-29

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载