聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator
共 9870字,需浏览 20分钟
·
2022-07-08 10:26
一、前言
在聊聊 Kafka 系列专栏中,我们前面讲了一篇 聊聊 Kafka:Consumer 源码解析之 Consumer 如何加入 Consumer Group,其实那一篇主要讲的是客户端 Consumer 加入组请求、加入组响应、同步组请求、同步组响应等操作,我们这一篇主要来讲服务端侧协调者 GroupCoordinator 处理的请求。服务端处理客户端请求的入口都是 KafkaApis 类,它会根据不同的请求类型分发给不同的方法处理。如下图:
二、主要处理流程
GroupCoordinator 主要有四大类处理的命令:COORDINATOR、GROUP、OFFSET、HEARTBEAT,具体命令如下:
COORDINATOR 命令:
ApiKeys.FIND_COORDINATOR
GROUP 命令:
ApiKeys.JOIN_GROUP
ApiKeys.LEAVE_GROUP
ApiKeys.SYNC_GROUP
ApiKeys.DESCRIBE_GROUPS
ApiKeys.LIST_GROUPS
ApiKeys.DELETE_GROUPS
OFFSET 命令:
ApiKeys.OFFSET_COMMIT
ApiKeys.OFFSET_FETCH
ApiKeys.OFFSET_FOR_LEADER_EPOCH
ApiKeys.OFFSET_DELETE
HEARTBEAT 命令:
ApiKeys.HEARTBEAT
我们下面针对主要的几个命令来进行源码剖析:
三、GroupCoordinator
对主要命令分析之前,我们还是先来看下协调者 GroupCoordinator 的主要数据结构以及它如何维护管理组成员关系的。
直接看注释不难得出:
GroupCoordinator 处理群组成员关系和偏移管理
GroupCoordinator 中的延迟操作使用 “group” 作为延迟操作锁
我们直接从入口开始看
3.1 实例化 GroupCoordinator
创建了两个 DelayedOperationPurgatory,主要是用于延迟队列操作关于:Heartbeat、Rebalance。
关于 offset、group 的配置信息加载:offsetConfig、groupConfig。
初始化 GroupMetadataManager group 元数据管理器
返回 GroupCoordinator 实例
3.2 启动 GroupCoordinator
上面实例化 GroupCoordinator 的时候会初始化 GroupMetadataManager group 元数据管理器,启动 GroupCoordinator 的时候也会 GroupMetadataManager 的 startup 方法,所以不难看出 GroupCoordinator 的功能部分应该是靠 GroupMetadataManager 这个管理器,那么我们来看下 GroupMetadataManager 中有哪些信息或组件:
3.2.1 GroupMetadataManager 元数据
class GroupMetadataManager(brokerId: Int,
interBrokerProtocolVersion: ApiVersion,
config: OffsetConfig,
// replicaManager 对象,管理 __consumer_offsets
val replicaManager: ReplicaManager,
zkClient: KafkaZkClient,
time: Time,
metrics: Metrics) extends Logging with KafkaMetricsGroup {
private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
// 记录每个 group 在服务端对应的 GroupMetadata 对象
private val groupMetadataCache = new Pool[String, GroupMetadata]
/* lock protecting access to loading and owned partition sets */
private val partitionLock = new ReentrantLock()
/* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */
// 记录了正在加载的 offsets topic 分区的 ID
private val loadingPartitions: mutable.Set[Int] = mutable.Set()
/* partitions of consumer groups that are assigned, using the same loading partition lock */
// 记录了已经加载的 offsets topic 分区的 ID
private val ownedPartitions: mutable.Set[Int] = mutable.Set()
/* shutting down flag */
private val shuttingDown = new AtomicBoolean(false)
/* number of partitions for the consumer metadata topic */
// 记录 offsets topic 的分区数量,这个字段会调用 getGroupMetadataTopicPartitionCount() 进行初始化,默认 50。
private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
/* single-thread scheduler to handle offset/group metadata cache loading and unloading */
// 处理偏移、组元数据缓存加载和卸载
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
/* The groups with open transactional offsets commits per producer. We need this because when the commit or abort
* marker comes in for a transaction, it is for a particular partition on the offsets topic and a particular producerId.
* We use this structure to quickly find the groups which need to be updated by the commit/abort marker. */
// 代表 transactional 有关的 producer 对应的 offset 记录
private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]()
/* setup metrics*/
private val partitionLoadSensor = metrics.sensor(GroupMetadataManager.LoadTimeSensor)
...
}
3.2.2 GroupMetadata 元数据
@nonthreadsafe
private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) extends Logging {
type JoinCallback = JoinGroupResult => Unit
private[group] val lock = new ReentrantLock
private var state: GroupState = initialState
var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
var protocolType: Option[String] = None
var protocolName: Option[String] = None
var generationId = 0
private var leaderId: Option[String] = None
// 成员的集合
private val members = new mutable.HashMap[String, MemberMetadata]
// Static membership mapping [key: group.instance.id, value: member.id]
private val staticMembers = new mutable.HashMap[String, String]
private val pendingMembers = new mutable.HashSet[String]
private var numMembersAwaitingJoin = 0
private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
// 每个 topic-partition 对应的 CommitRecordMetadataAndOffset (这个里面含有 offset 的 long 值、OffsetAndMetadata [offset 提交的时间戳、offset 超时的时间戳])
private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
private var receivedTransactionalOffsetCommits = false
private var receivedConsumerOffsetCommits = false
// When protocolType == `consumer`, a set of subscribed topics is maintained. The set is
// computed when a new generation is created or when the group is restored from the log.
private var subscribedTopics: Option[Set[String]] = None
var newMemberAdded: Boolean = false
...
}
对于 Kafka 服务端的组,GroupState 有五种状态 Empty、PreparingRebalance、CompletingRebalance、Stable、Dead。他们的状态转换如下图所示:
3.2.3 启动 GroupCoordinator 具体的流程
线程池 scheduler 启动
向 scheduler 中添加一个定时任务
delete-expired-group-metadata
,用于清除过期的 group metadata。group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
将过期的记录从 offsets 中去除
将有效的记录返回 Map[TopicPartition, OffsetAndMetadata]
主要的函数 cleanupGroupMetadata(groups: Iterable[GroupMetadata], selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata])
遍历 groups 获取 group 即 GroupMetadata,从 selector 中获取对应的 OffsetAndMetadata,对 group 进行判断如下,为 true 则将 group 状态改为 Dead。
group.is(Empty) && !group.hasOffsets
此时刚启动起来,没有成员加入,我们到此就打住了,实例化与启动 GroupCoordinator 就到这,后面有关状态的流转下篇再来分析。
假设一台 broker 启动了,然后服务端的 GroupCoordinator 在此时启动了。那么后面会发生什么?从 GroupMetadata 的状态变更可以看出来,一开始是 Empty 因为刚起来什么都没有。然后想要状态变更就有两个途径:新成员加入、超时过期。
我们这里看下新成员加入的场景,当前的服务端 GroupCoordinator 已启动了,一个新的消费者组过来了,首先需要找到这个 GroupCoordinator,即 FIND_COORDINATOR,然后发送加入的请求 JOIN_GROUP,再同步分区的分配信息 SYNC_GROUP。
我们下一篇来说一下找到这个 GroupCoordinator,即 FIND_COORDINATOR。
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。