你能说出 Kafka 这些原理吗
快
节奏的人。周末我听到了这么一个事情:在我的大学里,有这样一个人,他是班长。家里有些钱,可能接触社会比较早,为人处事比较成熟,和导员关系非常好,可以说好到经常性的不来上课,导员竟然还帮忙写假条;好到同学评价班长的时候,打分60分导员竟然私自帮忙改到80分。我说了一句话:这是社会催化的产物,校园也是一个小社会,很多事情从这个小社会就开始变质了。好好做人,别总是想寻求捷径
。这句话像茶一样,慢慢品才香。Kafka 是如何进行复制的 Kafka 是如何处理来自生产者和消费者的请求的 Kafka 的存储细节是怎样的
集群成员间的关系
主机(broker)
,每个 broker 都会有一个 broker.id
,每个 broker.id 都有一个唯一的标识符用来区分,这个标识符可以在配置文件里手动指定,也可以自动生成。Kafka 可以通过 broker.id.generation.enable 和 reserved.broker.max.id 来配合生成新的 broker.id。 broker.id.generation.enable参数是用来配置是否开启自动生成 broker.id 的功能,默认情况下为true,即开启此功能。自动生成的broker.id有一个默认值,默认值为1000,也就是说默认情况下自动生成的 broker.id 从1001开始。
/brokers/ids
路径下注册一个与当前 broker 的 id 相同的临时节点。Kafka 的健康状态检查就依赖于此节点。当有 broker 加入集群或者退出集群时,这些组件就会获得通知。如果你要启动另外一个具有相同 ID 的 broker,那么就会得到一个错误 —— 新的 broker 会试着进行注册,但不会成功,因为 ZooKeeper 里面已经有一个相同 ID 的 broker。 在 broker 停机、出现分区或者长时间垃圾回收停顿时,broker 会从 ZooKeeper 上断开连接,此时 broker 在启动时创建的临时节点会从 ZooKeeper 中移除。监听 broker 列表的 Kafka 组件会被告知该 broker 已移除。 在关闭 broker 时,它对应的节点也会消失,不过它的 ID 会继续存在其他数据结构中,例如主题的副本列表中,副本列表复制我们下面再说。在完全关闭一个 broker 之后,如果使用相同的 ID 启动另一个全新的 broker,它会立刻加入集群,并拥有一个与旧 broker 相同的分区和主题。
Broker Controller 的作用
znode
节点的问题。znode
,znode 节点是一种树形的文件结构,它很像 Linux 操作系统的文件路径,ZooKeeper 的根节点是 /
。Watcher
机制:当数据发生变化的时候, ZooKeeper 会产生一个 Watcher 事件,并且会发送到客户端。Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 Zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 ZooKeeper 实现分布式锁、集群管理等功能。控制器的选举
/controller
让自己成为 controller 控制器。其他 broker 在启动时也会尝试创建这个节点,但是由于这个节点已存在,所以后面想要创建 /controller 节点时就会收到一个 节点已存在 的异常。然后其他 broker 会在这个控制器上注册一个 ZooKeeper 的 watch 对象,/controller
节点发生变化时,其他 broker 就会收到节点变更通知。这种方式可以确保只有一个控制器存在。那么只有单独的节点一定是有个问题的,那就是单点问题
。控制器的作用
组件
被设计用来干什么?别着急,接下来我们就要说一说。控制器相当于部门(集群)中的部门经理(broker controller),用于管理部门中的部门成员(broker) 控制器是所有 broker 的一个监视器,用于监控 broker 的上线和下线 在 broker 宕机后,控制器能够选举新的分区 Leader 控制器能够和 broker 新选取的 Leader 发送消息
主题管理
: Kafka Controller 可以帮助我们完成对 Kafka 主题创建、删除和增加分区的操作,简而言之就是对分区拥有最高行使权。
分区重分配
: 分区重分配主要是指,kafka-reassign-partitions 脚本提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。Prefered 领导者选举
: Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。集群成员管理
: 主要管理 新增 broker、broker 关闭、broker 宕机数据服务
: 控制器的最后一大类工作,就是向其他 broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。这些数据我们会在下面讨论
broker controller 数据存储
broker 上的所有信息,包括 broker 中的所有分区,broker 所有分区副本,当前都有哪些运行中的 broker,哪些正在关闭中的 broker 。 所有主题信息,包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。
broker controller 故障转移
/brokers/ids
下创建节点的 broker 作为 broker controller,也就是说 broker controller 只有一个,那么必然会存在单点失效问题。kafka 为考虑到这种情况提供了故障转移
功能,也就是 Fail Over
。如下图注意:ZooKeeper 中存储的不是缓存信息,broker 中存储的才是缓存信息。
broker controller 存在的问题
controller 状态的更改由不同的监听器并发执行,因此需要进行很复杂的同步,并且容易出错而且难以调试。 状态传播不同步,broker 可能在时间不确定的情况下出现多种状态,这会导致不必要的额外的数据丢失 controller 控制器还会为主题删除创建额外的 I/O 线程,导致性能损耗 controller 的多线程设计还会访问共享数据,我们知道,多线程访问共享数据是线程同步最麻烦的地方,为了保护数据安全性,控制器不得不在代码中大量使用ReentrantLock 同步机制,这就进一步拖慢了整个控制器的处理速度。
broker controller 内部设计原理
Event Executor Thread
,事件执行线程,从图中可以看出,不管是 Event Queue 事件队列还是 Controller context 控制器上下文都会交给事件执行线程进行处理。将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费。异步操作
。ZooKeeper API 提供了两种读写的方式:同步和异步。之前控制器操作 ZooKeeper 都是采用的同步方式,这次把同步方式改为异步,据测试,效率提升了10倍。副本机制
备份机制(Replication)
,通常指分布式系统在多台网络交互的机器上保存有相同的数据备份/拷贝。Leader(领导者)
副本,一种是Follower(跟随者)
副本。Leader 副本
Follower 副本
Follower 副本
,Follower 不对外提供服务。下面是 Leader 副本的工作方式Kafka 中,Follower 副本也就是追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的请求。所有的请求都是由领导者副本来处理。或者说,所有的请求都必须发送到 Leader 副本所在的 broker 中,Follower 副本只是用作数据拉取,采用 异步拉取
的方式,并写入到自己的提交日志中,从而实现与 Leader 的同步当 Leader 副本所在的 broker 宕机后,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并开启新一轮的选举,从追随者副本中选一个作为 Leader。如果宕机的 broker 重启完成后,该分区的副本会作为 Follower 重新加入。
不同步
的。如果一个副本没有与领导者同步,那么在领导者掉线后,这个副本将不会称为领导者,因为这个副本的消息不是全部的。同步的副本
。也就是说,如果领导者掉线,那么只有同步的副本能够称为领导者。能够立刻看到写入的消息,就是你使用生产者 API 成功向分区写入消息后,马上使用消费者就能读取刚才写入的消息 能够实现消息的幂等性,啥意思呢?就是对于生产者产生的消息,在消费者进行消费的时候,它每次都会看到消息存在,并不会存在消息不存在的情况
同步复制和异步复制
发送 - 等待
机制的,这是一种同步的复制方式,那么为什么说跟随者副本同步领导者副本的时候是一种异步操作呢?producer 通知 ZooKeeper 识别领导者 producer 向领导者写入消息 领导者收到消息后会把消息写入到本地 log 跟随者会从领导者那里拉取消息 跟随者向本地写入 log 跟随者向领导者发送写入成功的消息 领导者会收到所有的跟随者发送的消息 领导者向 producer 发送写入成功的消息
ISR
(a set of In-Sync Replicas),简称ISR
,ISR 也是一个很重要的概念,我们之前说过,追随者副本不提供服务,只是定期的异步拉取领导者副本的数据而已,拉取这个操作就相当于是复制,ctrl-c + ctrl-v
大家肯定用的熟。那么是不是说 ISR 集合中的副本消息的数量都会与领导者副本消息数量一样呢?那也不一定,判断的依据是 broker 中参数 replica.lag.time.max.ms
的值,这个参数的含义就是跟随者副本能够落后领导者副本最长的时间间隔。Unclean 领导者选举
unclean.leader.election.enable
的话,下一个领导者就会在这些非同步的副本中选举。这种选举也叫做Unclean 领导者选举
。Kafka 请求处理流程
请求/响应
式的,我猜测你接触最早的请求/响应的方式应该就是 HTTP 请求了。事实上,HTTP 请求可以是同步可以是异步的。一般正常的 HTTP 请求都是同步的,同步方式最大的一个特点是提交请求->等待服务器处理->处理完毕返回 这个期间客户端浏览器不能做任何事。而异步方式最大的特点是 请求通过事件触发->服务器处理(这时浏览器仍然可以做其他事情)-> 处理完毕。这里需要注意一点,我们只是使用 HTTP 请求来举例子,而 Kafka 采用的是 TCP 基于 Socket 的方式进行通讯
吞吐量太差
,资源利用率极低,由于只能顺序处理请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统
。响应式模型
响应式(Reactor)模型
,那么什么是响应式模型呢?简单的说,Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景,如下图所示:Request type (也就是 API Key) Request version(broker 可以处理不同版本的客户端请求,并根据客户版本做出不同的响应) Correlation ID --- 一个具有唯一性的数字,用于标示请求消息,同时也会出现在响应消息和错误日志中(用于诊断问题) Client ID --- 用于标示发送请求的客户端
Acceptor
线程,这个线程会创建一个连接,并把它交给 Processor(网络线程池)
, Processor 的数量可以使用 num.network.threads
进行配置,其默认值是3,表示每台 broker 启动时会创建3个线程,专门处理客户端发送的请求。轮询
的方式将入栈请求公平的发送至网络线程池中,因此,在实际使用过程中,这些线程通常具有相同的机率被分配到待处理请求队列
中,然后从响应队列
获取响应消息,把它们发送给客户端。Processor 网络线程池中的请求 - 响应的处理还是比较复杂的,下面是网络线程池中的处理流程图:共享请求队列
,因为网络线程池是多线程机制的,所以请求队列的消息是多线程共享的区域,然后由 IO 线程池进行处理,根据消息的种类判断做何处理,比如 PRODUCE
请求,就会将消息写入到 log 日志中,如果是FETCH
请求,则从磁盘或者页缓存中读取消息。也就是说,IO线程池是真正做判断,处理请求的一个组件。在IO 线程池处理完毕后,就会判断是放入响应队列
中还是 Purgatory
中,Purgatory 是什么我们下面再说,现在先说一下响应队列,响应队列是每个线程所独有的,因为响应式模型中不会关心请求发往何处,因此把响应回传的事情就交给每个线程了,所以也就不必共享了。注意:IO 线程池可以通过 broker 端参数 num.io.threads
来配置,默认的线程数是8,表示每台 broker 启动后自动创建 8 个IO 处理线程。
请求类型
acks
这个配置项的含义all
,那么这些请求会被保存在 炼狱(Purgatory)
的缓冲区中,直到领导者副本发现跟随者副本都复制了消息,响应才会发送给客户端。零复制
技术向客户端发送消息,Kafka 会直接把消息从文件中发送到网络通道中,而不需要经过任何的缓冲区,从而获得更好的性能。上限
指的是客户端为接受足够消息分配的内存空间,这个限制比较重要,如果上限太大的话,很有可能直接耗尽客户端内存。下限
可以理解为攒足了数据包再发送的意思,这就相当于项目经理给程序员分配了 10 个bug,程序员每次改一个 bug 就会向项目经理汇报一下,有的时候改好了有的时候可能还没改好,这样就增加了沟通成本和时间成本,所以下限值得就是程序员你改完10个 bug 再向我汇报!!!如下图所示:拉取消息
---> 消息
之间是有一个等待消息积累这么一个过程的,这个消息积累你可以把它想象成超时时间,不过超时会跑出异常,消息积累超时后会响应回执。延迟时间可以通过 replica.lag.time.max.ms
来配置,它指定了副本在复制消息时可被允许的最大延迟时间。非分区首领
的错误响应;如果针对某个分区的请求被发送到不含有领导者的 broker 上,也会出现同样的错误。Kafka 客户端需要把请求和响应发送到正确的 broker 上。这不是废话么?我怎么知道要往哪发送?元数据请求
,这种请求会包含客户端感兴趣的主题列表,服务端的响应消息指明了主题的分区,领导者副本和跟随者副本。元数据请求可以发送给任意一个 broker,因为所有的 broker 都会缓存这些信息。metadata.max.age.ms
参数来配置,从而知道元数据是否发生了变更。比如,新的 broker 加入后,会触发重平衡,部分副本会移动到新的 broker 上。这时候,如果客户端收到 不是首领
的错误,客户端在发送请求之前刷新元数据缓存。Kafka 重平衡流程
群组协调者(Coordinator)
的,而重平衡的流程就是由 Coordinator 的帮助下来完成的。消费者订阅的任何主题发生变化 消费者数量发生变化 分区数量发生变化 如果你订阅了一个还尚未创建的主题,那么重平衡在该主题创建时发生。如果你订阅的主题发生删除那么也会发生重平衡 消费者被群组协调器认为是 DEAD
状态,这可能是由于消费者崩溃或者长时间处于运行状态下发生的,这意味着在配置合理时间的范围内,消费者没有向群组协调器发送任何心跳,这也会导致重平衡的发生。
在了解重平衡之前,你需要知道这两个角色
群组协调器(Coordinator)
:群组协调器是一个能够从消费者群组中收到所有消费者发送心跳消息的 broker。在最早期的版本中,元数据信息是保存在 ZooKeeper 中的,但是目前元数据信息存储到了 broker 中。每个消费者组都应该和群组中的群组协调器同步。当所有的决策要在应用程序节点中进行时,群组协调器可以满足 JoinGroup
请求并提供有关消费者组的元数据信息,例如分配和偏移量。群组协调器还有权知道所有消费者的心跳,消费者群组中还有一个角色就是领导者,注意把它和领导者副本和 kafka controller 进行区分。领导者是群组中负责决策的角色,所以如果领导者掉线了,群组协调器有权把所有消费者踢出组。因此,消费者群组的一个很重要的行为是选举领导者,并与协调器读取和写入有关分配和分区的元数据信息。消费者领导者
:每个消费者群组中都有一个领导者。如果消费者停止发送心跳了,协调者会触发重平衡。在了解重平衡之前,你需要知道状态机是什么
消费者组状态机(State Machine)
,来帮助协调者完成整个重平衡流程。消费者状态机主要有五种状态它们分别是 Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。消费者组一开始处于 Empty
状态,当重平衡开启后,它会被置于PreparingRebalance
状态等待新消费者的加入,一旦有新的消费者加入后,消费者群组就会处于CompletingRebalance
状态等待分配,只要有新的消费者加入群组或者离开,就会触发重平衡,消费者的状态处于 PreparingRebalance 状态。等待分配机制指定好后完成分配,那么它的流程图是这样的
在上图的基础上,当消费者群组都到达 Stable
状态后,一旦有新的消费者加入/离开/心跳过期,那么触发重平衡,消费者群组的状态重新处于 PreparingRebalance 状态。那么它的流程图是这样的。
在上图的基础上,消费者群组处于 PreparingRebalance 状态后,很不幸,没人玩儿了,所有消费者都离开了,这时候还可能会保留有消费者消费的位移数据,一旦位移数据过期或者被刷新,那么消费者群组就处于 Dead
状态了。它的流程图是这样的:
在上图的基础上,我们分析了消费者的重平衡,在 PreparingRebalance
或者CompletingRebalance
或者Stable
任意一种状态下发生位移主题分区 Leader 发生变更,群组会直接处于 Dead 状态,它的所有路径如下
这里面需要注意两点: 一般出现 Required xx expired offsets in xxx milliseconds 就表明Kafka 很可能就把该组的位移数据删除了 只有 Empty 状态下的组,才会执行过期位移删除的操作。
重平衡流程
Rebalance
的过程。重平衡过程可以从两个方面去看:消费者端和协调者端,首先我们先看一下消费者端从消费者看重平衡
消费者加入组
和 等待领导者分配方案
。这两个步骤后分别对应的请求是 JoinGroup
和 SyncGroup
。JoinGroup
请求。在该请求中,每个消费者成员都需要将自己消费的 topic 进行提交,我们上面描述群组协调器中说过,这么做的目的就是为了让协调器收集足够的元数据信息,来选取消费者组的领导者。通常情况下,第一个发送 JoinGroup 请求的消费者会自动称为领导者。领导者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。如图:SyncGroup
请求给协调者,协调者负责下发群组中的消费策略。下图描述了 SyncGroup 请求的过程:从协调者来看重平衡
新成员加入组 组成员主动离开 组成员崩溃离开 组成员提交位移
新成员入组
Stable
等待分配的过程,这时候如果有新的成员加入组的话,重平衡的过程。组成员离开
close()
方法主动通知协调者它要退出。这里又会有一个新的请求出现 LeaveGroup()请求
。如下图所示组成员崩溃
组成员崩溃
,崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。如下图所示:重平衡时提交位移
评论