Kafka源码解析与实战(txt+pdf+epub+mobi电子书下载)


发布时间:2021-03-06 08:05:14

点击下载

作者:王亮

出版社:机械工业出版社

格式: AZW3, DOCX, EPUB, MOBI, PDF, TXT

Kafka源码解析与实战

Kafka源码解析与实战试读:

前言

我开始接触分布式计算的时候,正好需要利用Spark结合Kafka进行流式处理。恰巧的是Kafka和Spark底层都是利用Scala语言编写的,并且当时市面上有关Kafka的中文书籍几乎没有,因此正好利用这个机会学习了Scala语言,并且通读了Kafka和Spark的源码,随后把日常的积累通过博客的形式慢慢记录下来。在这一年多的积累过程中,发现有关Kafka的中文书籍还是很缺乏,便有了总结出书的想法,而恰在这个时候吴怡编辑通过博客联系上了我,希望我把日常的积累总结成Kafka的专业性书籍,分享给更广大的从事大数据相关工作的人群。

本书将从初学者的角度出发,循序渐进地讲解Kafka内部的实现原理,但是由于Kafka是基于Scala语言编写的,因此为了更好地阅读本书,希望读者对于Scala语言有大致的了解。

阅读指南

本书将从Kafka的内部实现原理、运维工具、客户端编程以及实际应用这四个方面出发,系统阐述有关Kafka的各方面知识,全书共10章,每章的大致内容如下。

第1章介绍Kafka诞生的背景、Kafka在LinkedIn内部的应用、Kafka的主要设计目标以及为什么使用消息系统。

第2章介绍Kafka的基本组成、拓扑结构及其内部的通信协议。

第3章描述Kafka集群组成的基本元素Broker Server的启动以及内部的模块组成。通过阅读这一章,读者能对Broker Server有整体上的印象,为之后章节的阅读打下基础。

第4章描述Broker Server内部的九大基本模块:SocketServer、KafkaRequestHandlerPool、LogManager、ReplicaManager、OffsetManager、KafkaScheduler、KafkaApis、KafkaHealthcheck和TopicConfigManager。

第5章介绍Broker Server的控制管理模块KafkaController,这个模块负责整个Kafka集群的管理,例如:Topic的新建和删除、分区状态和副本状态的转换、集群的负载均衡管理等。

第6章介绍三个维护脚本:kafka-topics.sh、kafka-reassign-partitions.sh和kafka-preferred-replica-election.sh,它们分别涉及Topic的生命周期管理、Topic分区的重分配和分区首选副本的选择。

第7章从设计原则、示例代码、模块组成和发送模式四个部分介绍有关消息生产者的相关知识,从设计原则至客户端编程,从客户端编程到内部实现原理,由浅入深,循序渐进地讲解。

第8章分别介绍两种消费者:简单消费者和高级消费者。针对每种消费者都将依次从设计原则、消费者流程、示例代码以及原理解析四个部分介绍消费者的相关知识。

第9章介绍Kafka与典型大数据系统的集成,包括:Kafka和Storm的集成、Kafka和ELK的集成、Kafka和Hadoop的集成以及Kafka和Spark的集成。希望通过本章使读者对Kafka和第三方大数据平台集成有大致的了解。

第10章用综合实例描述了Kafka的应用,案例描述Kafka作为数据总线在安防整体解决方案中的作用,通过车辆人脸图片数据的入库、视频数据的入库、数据延时的监控、数据质量的监控、布控统计和容灾备份6个业务,简要阐述内部的实现原理。

本书是基于0.8.2版本的Kafka编写的,其相关配套的源码可以从Kafka的官方网站上下载,下载地址为http://kafka.apache.org/downloads,也可以从开源或者私有软件项目托管平台GitHub上下载,下载地址为https://github.com/apache/kafka。为了简化代码流程描述,笔者会将一些日志打印等不影响阅读的代码用“……”代替,如果需要知道“……”代表的实际含义,可以参考源码包中的真实代码。

本书特点

由浅入深,循序渐进:本书从LinkedIn(领英)公司内部大数据架构讲起,引出消息队列Kafka,接着讲解Kafka的基本架构,然后着重分析Kafka内部的各模块实现细节。从诞生背景至架构组成,再到内部实现细节,由浅入深,循序渐进,让读者在阅读时能够逐步了解Kafka。

由里到外,层层剖析:本书不仅讲解Kafka内部的实现原理,而且还详细描述Kafka外部的维护工具,对外的客户端编程原理以及和第三方集成的方式。由里到外,层层剖析,让读者在阅读时能够更加全面地掌握Kafka。

图文并茂,生动形象:本书在讲解Kafka的过程中穿插了大量的图片,直观地描述了工作原理,使读者在阅读时能够加深对代码的理解。

读者对象

本书适合以下人群阅读:

·想熟悉典型消息系统架构的大数据从业人员。

·想了解分布式系统开发的软件工程师。

·想掌握Kafka内部实现原理的中高级开发人员。

·想搭建传统大数据框架的系统分析师。

致谢

首先感谢我的夫人在我背后默默的付出,是她给了我动力,陪伴我度过了长达半年之久的枯燥时光,坚定了我完成此书的决心。其次感谢机械工业出版社吴怡编辑的鼓励和支持,是她促成了这本书的出版。接着感谢我的鱼儿们(布隆迪、金头虎、蓝茉莉、三间鼠和反游猫),每当我思绪混乱的时候可以静静地看着它们慢慢梳理。

在本书成书的过程中也得到了许多同事和同学的支持、鼓励,在此一并致谢。

由于作者水平及能力有限,加之时间仓促,本书难免存在错误和不妥之处,恳请广大读者批评指正,邮箱地址为:wangliang168219@126.com。第1章Kafka简介

Kafka是一个高度可扩展的消息系统,它在LinkedIn的中央数据管道中扮演着十分重要的角色,因其可水平扩展和高吞吐率而被广泛使用,现在已被多家不同类型的公司作为多种类型的数据管道和消息系统。本章将聚集于Kafka诞生的背景、Kafka在LinkedIn内部的应用、Kafka的主要设计目标和为什么使用消息系统这四个方面,简单介绍典型的消息系统Kafka,尤其在应用的时候需要多思考后两个方面:Kafka的设计目标和应用层开发为什么需要使用消息系统。并以此为切入点,为之后的章节作铺垫。1.1 Kafka诞生的背景

对于一个高效的组织,所有数据需要对该组织的所有服务和系统是可用的,以便挖掘出数据的最大价值。数据采集和数据使用是一个金字塔的结构,底部为以某种统一的方式捕获数据,这些数据需要以统一的方式建模,以方便读取和处理。捕获数据的工作做扎实后,在这个基础上以不同方法处理这些数据就变得得心应手。

数据捕获的来源主要有两种:一种是记录正在发生的事件数据。比如Web系统中的用户活动日志(用户的点击选择等)、交警行业中的违章事件等。随着传统行业业务活动的数字化,事件数据正在不断增长,而且这个趋势没有停止。这种类型的事件数据记录了已经发生的事情,往往比传统数据库应用要大好几个数量级。因此对于数据的捕获、数据的处理提出了重大的挑战;另一种是经过二次分析处理之后的数据。对捕获的数据进行二次分析处理后得到的数据也需要记录保存,这里的处理指的是利用批处理、图分析等专有的数据处理系统进行了处理,这些加工后的数据可以作为数据捕获的第二个来源。

总之,捕获的数据越来越多,如何将这些巨量的数据以可靠的、完整的数据流方式传递给数据分析处理系统也变得越来越困难。

LinkedIn使用的数据系统包括:

·全文搜索

·Social Graph(社会图谱)

·Voldemort(键值存储)

·Espresso(文档存储)

·推荐引擎

·OLAP(查询引擎)

·Hadoop

·Teradata(数据仓库)

·Ingraphs(监控图表和指标服务)

上述专用的分布式系统都需要经过数据源来获取数据,同时有些系统还会产生数据,作为其他系统的数据源。LinkedIn曾尝试为每个数据源和目标构建自定义的数据加载,很显然这是不可行的。LinkedIn有几十个数据系统和数据仓库。把这些系统和仓库联系起来,就会导致任意两两系统间构建自定义的管道,如图1-1所示。图1-1 各系统之间的自定义管道

需要注意的是,数据是双向流动的,例如许多系统(数据库、Hadoop)同时是数据传输的来源和目的端。这就意味着我们最后要为那些系统建立两个通道:一个用于数据输入,一个用于数据输出。要避免上面的问题,我们需要如图1-2所示的通用方式。

我们需要尽量将每个生产者、消费者与数据源隔离。理想情况下,生产者或消费者应该只与一个数据源单独集成,这样就能访问到所有数据。根据这个思路想到增加一个新的数据系统:

·作为数据来源或者数据目的地。

·集成工作只需要连接这个新系统到一个单独的管道,而无须连接到每个数据的生产者和消费者。图1-2 各系统之间的共享管道

这个新的数据系统就是Kafka。Kafka作为LinkedIn中的“中枢神经系统”,管理从各个应用程序汇聚到此的信息流,这些数据经过处理后再被分发到各处。Kafka作为一个消息系统,进行消息的传递;同时它也是日志存储系统,以日志的形式存储了数据源的所有数据。1.2 Kafka在LinkedIn内部的应用

LinkedIn的工程师团队已经把Kafka打造为管理信息流的开源解决方案。他们把Kafka作为消息中枢,帮助公司的各个应用以松耦合的方式在一起工作。LinkedIn已经严重依赖于Kafka,并且基于Kafka的生态系统,LinkedIn开发出了一些开源组件和公司内部组件。

Kafka在LinkedIn中的使用场景如下所述:

·系统监控:LinkedIn内所有的主机都会往Kafka发送系统健康信息和运行信息,负责展示运维信息和报警的系统则从Kafka订阅获取这些运维信息,处理后进行业务的展示和告警业务的触发。更进一步,LinkedIn通过自家的实时流处理系统Samza对这些运维数据进行实时的处理分析,生成实时的调用图分析。

·传统的消息队列:LinkedIn内大量的应用系统把Kafka作为一个分布式消息队列进行使用。这些应用囊括了搜索、内容相关性反馈等应用,这些应用将处理后的数据通过Kafka传递到Voldemort分布式存储系统。

·分析:LinkedIn会搜集所有的数据以更好地了解用户是如何使用LinkedIn的产品的。哪些网页被浏览,哪些内容被点击这样的信息都会发送到每个数据中心的Kafka。这些数据被汇总起来并通过Kafka发送到Hadoop集群进行分析和每日报表生成。

·作为其他分布式日志系统的组件:Kafka也被LinkedIn内其他分布式系统作为核心的日志组件,比如大数据仓储解决方案Pinot。Kafka也被分布式数据库Espresso用于负责数据的复制与修改。

除了Kafka的直接应用,LinkedIn还开发了一些Kafka组件以应对其他的一些使用场景:

·MirrorMaker:让Kafka集群之间能同步数据。在很多情况下LinkedIn需要做跨数据中心的操作,对这些操作的事件记录,原有的Kafka无法支持,通过MirrorMaker,Kafka也能支持跨数据中心的事件记录传递。

·RESTful接口:用户能通过RESTful接口向Kafka发布消费消息,而不需要开发Java代码的客户端。

·审计服务:事件一般是在一个数据中心产生的,有时候会有场景需要在另一个数据中心对该事件进行离线分析。为此,LindedIn会把事件消息从一个数据中心同步到另一个数据中心。在消息同步的过程中,消费消息的应用需要知道什么时候这些消息被同步完,之后应用才可以开始离线处理。审计服务保证了这一点。1.3 Kafka的主要设计目标

Kafka作为一种分布式的、基于发布/订阅的消息系统,其主要设计目标如下:

·以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上的数据也能保证常数时间的访问性能。

·高吞吐率,即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。

·支持Kafka Server间的消息分区,及分布式消费,同时保证每个分区内的消息顺序传输。

·支持离线数据处理和实时数据处理。

·支持在线水平扩展。1.4 为什么使用消息系统

回过头来看一下,我们为什么需要使用消息系统呢?其大致目的如下:

·解耦:在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这使得开发人员可以独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可。

·冗余:有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到数据完全处理完,通过这一方式规避了数据丢失的风险。许多消息队列所采用的“插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要处理系统明确指出该消息已经被处理完毕,从而确保数据被安全保存直到使用完毕。

·扩展性:因为消息队列解耦了处理过程,所以增大消息入库和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码,不需要调节参数,扩展就像调大电力按钮一样简单。

·灵活性和峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见,并且以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,不会因为突发的超负荷的请求而完全崩溃。

·可恢复性:系统的一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

·顺序保证:在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka可保证一个分区内的消息是有序的。

·缓冲:在任何重要的系统中,都会需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率地执行,写入队列的处理会尽量的快速。该缓冲有助于控制和优化数据流经过系统的速度。

·异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理。想向队列中放入多少消息就放多少,然后在需要的时候再去处理。1.5 本章小结

本章先讲述了Kafka诞生的背景及在LinkedIn公司中的应用,接着讲述了LinkedIn设计Kafka的主要目标,本质上最重要的就是两点:高吞吐量和可水平扩展。最后讲述了为什么需要使用消息系统,也就是应用层使用消息系统可以解决的问题,并且这也是本书的读者需要不断思考的问题。希望通过以上四个方面的介绍使得读者能够对Kafka形成初步的认识,同时对自己的系统进行思考。第2章Kafka的架构

从本章起将详细探讨Kafka内部的实现原理:其中包括Kafka的基本组成、Kafka的拓扑结构以及Kafka内部的通信协议。Kafka内部的通信协议是建立在Kafka的拓扑结构之上的,而Kafka的拓扑结构是由Kafka的基本模块所组成的,因此本章是其他剩余章节的基础,是理解Kafka的关键。2.1 Kafka的基本组成

在Kafka集群中生产者将消息发送给以Topic命名的消息队列Queue中,消费者订阅发往以某个Topic命名的消息队列Queue中的消息。其中Kafka集群由若干个Broker组成,Topic由若干个Partition组成,每个Partition里面的消息通过Offset来获取。

·Broker:一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic,Broker和Broker之间没有Master和Standby的概念,它们之间的地位基本是平等的。

·Topic:每条发送到Kafka集群的消息都属于某个主题,这个主题就称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存在一个或多个Broker上,但是用户只需指定消息的主题Topic即可生产或消费数据而不需要去关心数据存放在何处。

·Partition:为了实现可扩展性,一个非常大的Topic可以被分为多个Partition,从而分布到多台Broker上。Partition中的每条消息都会被分配一个自增Id(Offset)。Kafka只保证按一个Partition中的顺序将消息发送给消费者,但是不保证单个Topic中的多个Partition之间的顺序。

·Offset:消息在Topic的Partition中的位置,同一个Partition中的消息随着消息的写入,其对应的Offset也自增,其内部实现原理如图2-1所示。图2-1 Topic、Partition和Offset解析

·Replica:副本。Topic的Partition含有N个Replica,N为副本因子。其中一个Replica为Leader,其他都为Follower,Leader处理Partition的所有读写请求,与此同时,Follower会定期地去同步Leader上的数据。

·Message:消息,是通信的基本单位。每个Producer可以向一个Topic(主题)发布一些消息。

·Producer:消息生产者,即将消息发布到指定的Topic中,同时Producer也能决定此消息所属的Partition:比如基于Round-Robin(轮询)方式或者Hash(哈希)方式等一些算法。

·Consumer:消息消费者,即向指定的Topic获取消息,根据指定Topic的分区索引及其对应分区上的消息偏移量来获取消息。

·Consumer Group:消费者组,每个Consumer属于一个Consumer Group;反过来,每个Consumer Group中可以包含多个Consumer。如果所有的Consumer都具有相同的Consumer Group,那么消息将会在Consumer之间进行负载均衡。也就是说一个Partition中的消息只会被相同Consumer Group中的某个Consumer消费,每个Consumer Group消息消费是相互独立的。如果所有的Consumer都具有不同的Consumer Group,则消息将会被广播给所有的Consumer。Producer、Consumer和Consumer Group之间的关系如图2-2所示。图2-2 Producer、Consumer和Consumer Group三者的关系

·Zookeeper:存放Kafka集群相关元数据的组件。在Zookeeper集群中会保存Topic的状态信息,例如分区的个数、分区的组成、分区的分布情况等;保存Broker的状态信息;保存消费者的消费信息等。通过这些信息,Kafka很好地将消息生产、消息存储、消息消费的过程结合起来。2.2 Kafka的拓扑结构

一个典型的Kafka集群的拓扑结构如图2-3所示。图2-3 Kafka拓扑结构

一个典型的Kafka集群中包含若干个Producer(可以是某个模块下发的Command,或者是Web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干个Broker(Kafka集群支持水平扩展,一般Broker数量越多,整个Kafka集群的吞吐率也就越高),若干个Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置。Producer使用Push模式将消息发布到Broker上,Consumer使用Pull模式从Broker上订阅并消费消息。

一个简单的消息发送流程如图2-4所示。

1)Producer根据指定的路由方法(Round-Robin、Hash等),将消息Push到Topic的某个Partition里面。

2)Kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。

3)Consumer从Kafka集群Pull数据,并控制获取消息的Offset。图2-4 消息发送的简易流程2.3 Kafka内部的通信协议

Kafka内部各个Broker之间的角色并不是完全相等的,Broker内部负责管理分区和副本状态以及异常情况下分区的重新分配等这些功能的模块称为KafkaController。每个Kafka集群中有且只有1个Leader状态的KafkaController,当Leader状态的KafkaController出现异常时,其余的Standby状态下的KafkaController会通过Zookeeper选举出又一个Leader状态的KafkaController,因此Broker又可根据KafkaController模块的状态进行进一步的细分。

在一个正常运行的Kafka集群中,生产者和Broker之间,消费者和Broker之间,Broker和Broker之间不断地在进行不同网络协议的交互,正是由于各个组件不断地进行网络交互才维持了整个集群稳定的运行。其主要的网络通信协议如图2-5所示。

利用命令bin/kafka-topics.sh--create--zookeeper localhost:2181--replication-factor2--partitions3--topic my_topic在图2-5中创建了以my_topic命名的消息主题,其分区个数为3个,每个分区的副本因子为2,其中分区0的第一个副本和分区2的第二个副本位于Broker1上,分别以/my_topic/partition-0/replica0和/my_topic/partition-2/replica1表示;分区1的第一个副本和分区0的第二个副本位于Broker2上,分别以/my_topic/partition-1/replica0和/my_topic/partition-0/replica1表示;分区2的第一个副本和分区1的第二个副本位于Broker3上,分别以/my_topic/partition-2/replica0和/my_topic/partition-1/replica1表示。假设每个分区的replica0为对应分区的副本Leader,其余为Follower,并且此时Broker1的KafkaController模块状态为Leader,其余为Standy。在此基础上将分两个维度来阐述图2-5所表达的意思:图2-5 Kafka内部的通信协议维度一:通信协议详情

·ProducerRequest:生产者发送消息的请求,生产者将消息发送至Kafka集群中的某个Broker,Broker接收到此请求后持久化此消息并更新相关元数据信息。

·TopicMetadataRequest:获取Topic元数据信息的请求,无论是生产者还是消费者都需要通过此请求来获取感兴趣的Topic的元数据。

·FetchRequest:消费者获取感兴趣Topic的某个分区的消息的请求,除此之外,分区状态为Follower的副本也需要利用此请求去同步分区状态为Leader的对应副本数据。

·OffsetRequest:消费者发送至Kafka集群来获取感兴趣Topic的分区偏移量的请求,通过此请求可以获知当前Topic所有分区在不同时间段的偏移量详情。

·OffsetCommitRequest:消费者提交Topic被消费的分区偏移量信息至Broker,Broker接收到此请求后持久化相关偏移量信息。

·OffsetFetchRequest:消费者发送获取提交至Kafka集群的相关Topic被消费的详细信息,和OffsetCommitRequest相互对应。

·LeaderAndIsrRequest:当Topic的某个分区状态发生变化时,处于Leader状态的KafkaController发送此请求至相关的Broker,通知其做出相应的处理。

·StopReplicaRequest:当Topic的某个分区被删除或者下线的时候,处于Leader状态的KafkaController发送此请求至相关的Broker,通知其做出相应的处理。

·UpdateMetadataRequest:当Topic的元数据信息发生变化时,处于Leader状态的KafkaController发送此请求至相关的Broker,通知其做出相应的处理。

·BrokerControlledShutdownRequest:当Broker正常下线时,发生此请求至处于Leader状态的KafkaController。

·ConsumerMetadataRequest:获取保存特定Consumer Group消费详情的分区信息。维度二:通信协议交互

·Producer和Kafka集群:Producer需要利用ProducerRequest和TopicMetadataRequest来完成Topic元数据的查询、消息的发送。

·Consumer和Kafka集群:Consumer需要利用TopicMetadataRequest请求、FetchRequest请求、OffsetRequest请求、OffsetCommitRequest请求、OffsetFetchRequest请求和ConsumerMetadataRequest请求来完成Topic元数据的查询、消息的订阅、历史偏移量的查询、偏移量的提交、当前偏移量的查询。

·KafkaController状态为Leader的Broker和KafkaController状态为Standby的Broker:KafkaController状态为Leader的Broker需要利用LeaderAndIsrRequest请求、Stop-ReplicaRequest请求、UpdateMetadataRequest请求来完成对Topic的管理;Kafka-Controller状态为Standby的Broker需要利用BrokerControlledShutdownRequest请求来通知KafkaController状态为Leader的Broker自己的下线动作。

·Broker和Broker之间:Broker相互之间需要利用FetchRequest请求来同步Topic分区的副本数据,这样才能使Topic分区各副本数据实时保持一致。2.4 本章小结

本章主要讲解了有关Kafka消息系统的基本组成、拓扑结构和通信协议。Kafka消息系统主要由若干个Broker Server组成,其中生产者向指定的Topic发送消息,消费者订阅发往某个Topic的消息。因此Kafka消息系统内部的通信协议主要区分为三种:生产者和Broker之间,消费者和Broker之间以及Broker和Broker之间。掌握Kafka消息系统内部的通信协议是理解Kafka内部实现原理的关键,Kafka消息系统内部的各个模块都是通过其内部通信协议相互联系起来的。第3章Broker概述

回顾第2章Kafka的架构,Kafka集群是由若干个Broker组成的,Broker和Broker之间,Broker和生产者之间,Broker和消费者之间都存在不同的交互。因此本章将从Broker的启动脚本开始描述Broker的启动过程,以及基本阐述启动之后Broker内部存在的各个功能模块,包括SocketServer、KafkaRequestHandlerPool、LogManager、ReplicaManager、OffsetManager、KafkaScheduler、KafkaApis、KafkaHealthcheck和TopicConfigManager九大基本模块以及KafkaController集群控制管理模块。3.1 Broker的启动

Kafka的安装包目录结构如图3-1所示。图3-1 Kafka安装包的目录结构

bin目录存放的是Kafka提供的管理工具,其中包括Broker的启动脚本;config目录存放的是Broker的配置文件;libs目录存放的是相关的jar包。

进入bin目录,执行以下命令后台启动Broker:nohup ./bin/kafka-server-start.sh conf ig/server.properties &

可见Broker是通过脚本kafka-server-start.sh调用起来的,继续查看这个脚本的内容,如下所示:if [ $# -lt 1 ];then echo "USAGE: $0 [-daemon] server.properties" exit 1fibase_dir=$(dirname $0)// 省略中间步骤exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka $@

最终执行的是kafka.Kafka这个类,即内部package kafka里面的Kafka类。查看源码中关于这个类的详情,如下所示:object Kafka extends Logging { def main(args: Array[String]): Unit = { if (args.length != 1) { println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName())) System.exit(1) } try { val props = Utils.loadProps(args(0)) val serverConfig = new KafkaConfig(props) KafkaMetricsReporter.startReporters(serverConfig.props) val kafkaServerStartable = new KafkaServerStartable(serverConfig) Runtime.getRuntime().addShutdownHook(new Thread() { override def run() = { kafkaServerStartable.shutdown } }) kafkaServerStartable.startup kafkaServerStartable.awaitShutdown } catch { case e: Throwable => fatal(e) } System.exit(0) }}

程序在kafkaServerStartable.awaitShutdown停住,如果继续走下去,那么Broker就退出了。注意

在Scala语言中,class类对象中不可有静态变量和静态方法,但是提供了“伴生对象”的功能:在和类的同一个文件中定义同名的object对象,所有的main方法都必须在object中被调用,来提供程序的主入口,十分简单。

其中上面的kafkaServerStartable封装了KafkaServer,最终执行startup的是KafkaServer,如下代码所示:class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { private val server = new KafkaServer(serverConfig) def startup() { try { server.startup() AppInfo.registerInfo() } catch { case e: Throwable => fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e) System.exit(1) } } ......}

在这里终于见到了Broker启动过程中最关键的类KafkaServer,接下来将围绕KafkaServer讲解里面的模块组成。3.2 Broker内部的模块组成

首先,我们来看KafkaServer这个类包含的模块:class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { this.logIdent = "[Kafka Server " + config.brokerId + "], " private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private var startupComplete = new AtomicBoolean(false) val brokerState: BrokerState = new BrokerState val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null var offsetManager: OffsetManager = null var kafkaHealthcheck: KafkaHealthcheck = null var topicConfigManager: TopicConfigManager = null var replicaManager: ReplicaManager = null var apis: KafkaApis = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var zkClient: ZkClient = null ......}

分别是SocketServer(监听Socket请求)、KafkaRequestHandlerPool(请求处理资源池)、LogManager(日志管理)、ReplicaManager(分区副本管理)、OffsetManager(偏移量管理)、KafkaScheduler(后台任务调度资源池)、KafkaApis(业务逻辑实现层)、KafkaHealthcheck(提供Broker健康状态)、TopicConfigManager(Topic配置信息管理)和KafkaController(Kafka集群控制管理)。其相互之间的关系如图3-2所示。

详细说明如下:

·SocketServer:首先开启1个Acceptor线程用于监听默认端口号为9092上的Socket链接,然后当有新的Socket链接成功建立时会将对应的SocketChannel以轮询的方式转发给N个Processor线程中的某一个,并由其处理接下来该SocketChannel上的读写请求,其中N=num.network.threads,默认为3。当Processor线程监听来自SocketChannel的请求时,会将请求放置在RequestChannel中的请求队列;当Processor线程监听到SocketChannel请求的响应时,会将响应从RequestChannel中的响应队列中取出来并发送给客户端。

·KafkaRequestHandlerPool:真正处理Socket请求的线程池,其个数默认为8个,由参数num.io.threads决定。该线程池里面的线程KafkaRequestHandler从RequestChannel的请求队列中获取Socket的请求,然后调用KafkaApis完成真正的业务逻辑,最后将响应写回至RequestChannel中的响应队列,并交由SocketServer中对应的Processor线程发送给客户端。

·LogManager:Kafka的日志管理模块。主要提供删除任何过期数据和冗余数据,刷新脏数据,对日志文件进行Checkpoint以及日志合并的功能。

·ReplicaManager:Kafka的副本管理模块。主要提供针对Topic分区副本数据的管理功能,包括有关副本的Leader和ISR的状态变化、副本的删除、副本的监测等。其中ISR全称为In-Sync Replicas,即处于同步状态的副本,在后面的章节还会详细介绍。

·OffsetManager:Kafka的偏移量管理模块。主要提供针对偏移量的保存和读取的功能,Kafka管理Topic的偏移量存在两种方式:一种为Zookeeper,就是把偏移量提交至Zookeeper上;另一种为Kafka,就是把偏移量提交至Kafka内部Topic为“__consumer_offsets”的日志里面,主要由offsets.storage参数决定,默认为zookeeper。

·KafkaScheduler:Kafka的后台任务调度资源池。提供后台定期任务的调度,主要为LogManager、ReplicaManager和OffsetManager提供调度服务。

·KafkaApis:Kafka的业务逻辑实现层,根据不同的Request执行不同的操作,其中利用LogManager、OffsetManager和ReplicaManager来完成内部的处理。KafkaApis处理的请求包括ProducerRequest、TopicMetadataRequest、FetchRequest、OffsetRequest、OffsetCommitRequest、OffsetFetchRequest、LeaderAndIsrRequest、StopReplicaRequest、UpdateMetadataRequest、BrokerControlledShutdownRequest和ConsumerMetadataRequest。图3-2 Broker内部模块之间的关系

·KafkaHealthcheck:Broker Server在/brokers/ids上注册自己的ID,当Broker在线的时候,则对应的ID存在;当Broker离线的时候,则对应的ID不存在,以此来达到集群状态监测的目的。

·TopicConfigManager:在/config/changes上注册自己的回调函数来监测Topic配置信息的变化。

·KafkaController:Kafka的集群控制管理模块。由于Zookeeper上保存了Kafka集群的元数据信息,因此KafkaController通过在不同目录注册不同的回调函数来达到监测集群状态的目的,及时响应集群状态的变化。比如说:

1)/controller目录保存了Kafka集群中状态为Leader的KafkaController标识,通过监测这个目录的变化可以及时响应KafkaController状态的切换;

2)/admin/reassign_partitions目录保存了Topic重分区的信息,通过监测这个目录的变化可以及时响应Topic分区变化的请求;

3)/admin/preferred_replica_election目录保存了Topic分区副本的信息,通过监测这个目录的变化可以及时响应Topic分区副本变化的请求;

4)/brokers/topics目录保存了Topic的信息,通过监测这个目录的变化可以及时响应Topic创建和删除的请求;

5)/brokers/ids目录保存了Broker的状态,通过监测这个目录的变化可以及时响应Broker的上下线情况等。

希望读者着重理解Broker内部各个模块之间的相互关系,这对于从整体上把握Kafka架构会起到比较大的作用。在接下来的章节中将会针对以上每个模块进行进一步的讲解,如果在这一章节对某些概念不是很理解的话,也不要紧,可以继续往后看,然后再回过头看整体图,以便加深记忆。3.3 本章小结

本章大致讲述了Broker的脚本启动过程以及Broker内部的模块组成。Broker的模块组成主要区分为基本模块和控制管理模块,其中基本模块在每个Broker内部无论对内还是对外都提供服务,但是控制管理模块在整个Kafka集群中有且只有一个对外提供服务。在接下去的第4章和第5章将分别描述基本模块和控制管理模块的内部实现原理。第4章Broker的基本模块

Broker由以下九个基本模块组成:SocketServer(监听Socket请求)、KafkaRequestHandler-Pool(请求处理资源池)、LogManager(日志管理)、ReplicaManager(分区副本管理)、OffsetManager(偏移量管理)、KafkaScheduler(后台任务调度资源池)、KafkaApis(业务逻辑实现层)、KafkaHealthcheck(提供Broker健康状态)、TopicConfigManager(Topic配置信息管理)。本章将层层剖析这九大基本模块内部的实现原理,慢慢揭开它们的面纱,并且逐渐暴露内部的配置参数,使得读者可以更好地掌握它们。4.1 SocketServer

SocketServer作为Broker对外提供Socket服务的模块,主要用于接收Socket连接的请求,然后产生相应为之服务的SocketChannel对象,通过此对象来和客户端相互通信。SocketServer的组成如下:class SocketServer(val brokerId: Int, val host: String, val port: Int, val numProcessorThreads: Int, val maxQueuedRequests: Int, val sendBufferSize: Int, val recvBufferSize: Int, val maxRequestSize: Int = Int.MaxValue, val maxConnectionsPerIp: Int = Int.MaxValue, val connectionsMaxIdleMs: Long, val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) @volatile private var acceptor: Acceptor = null val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) ……}

它内部主要包括三个模块:1)Acceptor主要用于监听Socket的连接;2)Processor主要用于转发Socket的请求和响应;3)RequestChannel主要用于缓存Socket的请求和响应。

Acceptor的初始化过程如下:private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val recvBufferSize: Int, connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { // 开启Socket服务 val serverChannel = openServerSocket(host, port) def run() { // 注册Accept事件 serverChannel.register(selector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 // 监听Accept事件 while(isRunning) { val ready = selector.select(500) if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null try { key = iter.next iter.remove() if(key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(selector.close()) shutdownComplete() } ......}

其主要步骤如下:

1)开启Socket服务。

2)注册Accept事件。

3)监听此ServerChannel上的ACCEPT事件,当其发生时,将其以轮询(Round Robin)的方式把对应的SocketChannel转交给Processor处理线程。注意

OP_ACCEPT为NIO中的事件,当此事件发生时,表示服务器监听到了客户连接,服务器可以接收这个连接了。

Processor的初始化过程大致如下:private[kafka] class Processor(val id: Int, val time: Time, val maxRequestSize: Int, val aggregateIdleMeter: Meter, val idleMeter: Meter, val totalProcessorThreads: Int, val requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) { private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() override def run() { startupComplete() while(isRunning) { // 针对新的连接,注册其上的OP_READ事件 configureNewConnections() // 从RequestChannel获取响应产生OP_WRITE事件 processNewResponses() val startSelectTime = SystemTime.nanoseconds val ready = selector.select(300) currentTimeNanos = SystemTime.nanoseconds val idleTime = currentTimeNanos - startSelectTime idleMeter.mark(idleTime) aggregateIdleMeter.mark(idleTime / totalProcessorThreads) trace("Processor id " + id + " selection time = " + idleTime + " ns") // 监听selector上的OP_READ事件和OP_WRITE事件 if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null try { key = iter.next iter.remove() if(key.isReadable) read(key) else if(key.isWritable) write(key) else if(!key.isValid) close(key) else throw new IllegalStateException("Unrecognized key state for processor thread.") } catch { case e: EOFException => { info("Closing socket connection to %s.".format(channelFor(key). socket.getInetAddress)) close(key) } case e: InvalidRequestException => { info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage)) close(key) } case e: Throwable => { error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e) close(key) } } } } maybeCloseOldestConnection } debug("Closing selector.") closeAll() swallowError(selector.close()) shutdownComplete() }

其中newConnections保存了由Acceptor线程转移过来的SocketChannel对象,主要步骤如下:

1)当有新的SocketChannel对象进来的时候,注册其上的OP_READ事件以便接收客户端的请求。

2)从RequestChannel中的响应队列获取对应客户端请求的响应,然后产生OP_WRITE事件。

3)监听selector上的事件。如果是读事件,说明有新的request到来,需要转移给RequestChannel的请求队列;如果是写事件,说明之前的request已经处理完毕,需要从RequestChannel的响应队列获取响应并发送回客户端;如果是关闭事件,说明客户端已经关闭了该

试读结束[说明:试读内容隐藏了图片]

下载完整电子书


相关推荐

最新文章


© 2020 txtepub下载