Kafka消费者分区分配策略及自定义分配策略
共 4288字,需浏览 9分钟
·
2020-07-30 00:14
点击上方蓝色字体,选择“设为星标”
回复”资源“获取更多资源大数据技术与架构点击右侧关注,大数据开发领域最强公众号!暴走大数据点击右侧关注,暴走大数据!
kafka消费者如何分配分区以及分配分区策略和源码解释
我们知道kafka的主题中数据数据是按照分区的概念来的,一个主题可能分配了多个分区,每个分区配置了复制系数,为了可用性,在多个broker中进行复制,一个分区在多个broker中选举出一个副本首领,消费者只访问这个分区副本首领,这些在本章节不重要,本章节阐述一个消费者如何选定一个主题中多个分区中的一个分区,和kafka的分区分配策略核心源码解析。
kafka中分区策略核心实现有两种 一种是range范围策略,一种是roudRobin轮询策略,在构建KafkaConsumer类的时候配置,看一下策略的关系就能自行配置, 配置key为partition.assignment.strategy的具体实现,看下图:
首先我们需要有多种假设来举例
假设我们创建了一个主题,并且8个分区p0-p8,我们有3个消费者c0-c2
先来说说第一种策略, range策略
上面已经做好了一些假设
根据range策略,分区按照顺序平铺,消费者按照顺序平铺
分区数量除以消费者数量,这里是分区数量8除以消费者数量3 等于 2 (N),再分区数量8对消费数量3取余得到2 ( M ),kafka的range算法是前 M个消费能得到N+1个分区,剩余的消费者分配到N个分区
具体算法:假设区分数量为pCout,消费者数量为cCount
n = pCout / cCount 8 / 3 = 2
m = pCount % cCount 8 % 3 = 2
前m(2)个消费者得到n+1(2+1)个分区,剩余的消费者分配到N(2)个分区,最终结果如下图
range策略是kafka默认的一个分区分配的策略可以看看ConsumerConfig类的static块,默认配置的RangeAssignor
想看一下分配分区的策略的入口可以参考KafkaConsumer类中的pollOnce方法进去,里面调用的ensurePartitionAssignment方法,不过这里debug进去看还是挺复杂的,有兴趣的可以参考,篇幅讲的不是这些重点,具体入口可以看下图
下面看一看range策略中核心源码的实现,具体查看RangeAssignor类
@Override
public Map<String, List> assign(Map<String, Integer> partitionsPerTopic,
Map<String, List<String>> subscriptions) {
//获取每个主题消费者们
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList());
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
//主题
String topic = topicEntry.getKey();
//这个主题的消费们
List<String> consumersForTopic = topicEntry.getValue();
//主题的分区数量
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
//对主题的消费者进行排序
Collections.sort(consumersForTopic);
//主题数量除以主题消费者数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
//主题数量对消费者数量进行取余
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
//封装主题和分区信息
List partitions = partitions(topic, numPartitionsForTopic);
//下面就开始为每一个消费者分配分区,看到这里是不是会发现 消费者分区再均衡,每次添加消费者或者添加分区都会发生再均衡
//事件,不过这里不是重点
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
//消费者分区起始位置
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
//分配的分区数量, 从我们上面的假设的分区数量和消费者数量可以得出这里的值
// int length = 2 + (i + 1 > 2 ? 0 : 1);
//因为有的无法整除和取余的,所以前面的2个消费者这里会获得3 的结果, 最后一个消费者这里只能得到2
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
//为每个消费者分配分区信息
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
下面讲一讲kafka自带的第二种消费者分配分区的策略
轮询策略
还是按照上面的假设8个分区3个消费者
8个分区按照顺序平铺
构造消费者环 c0,c1,c2,c0,c1,c2.......
轮询分配过程是 p0 分配给了 c0, p1 分配给了 c1, p2分配给了 c2, p3分配给了c0, p4分配给了 c1, p5分配给了c2, 一次类推,所有分区轮询分配给一个消费者环,大概草图如下
上面草图 多多理解 , 核心源码如下
@Override
public Map
> assign(Map partitionsPerTopic, Map
> subscriptions) { Map
> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList
()); //讲消费者集合进行排序,构建一个消费者环, 内部通过索引位置+1对总数取余的方式实现的环
CircularIterator
assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); //对所有主题和分区进行排序, 假设集合中有多个主题/分区-分区,最终排序结果为
// t1/p0-p1-p2,t2/p0-p1,t3/p0-p1-p2
for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
//当前主题
final String topic = partition.topic();
//这里循环遍历看看消费者有没有订阅改topic,否则一直next到下一个消费者,主要的作用是跳过
//没有订阅该主题的消费者
while (!subscriptions.get(assigner.peek()).contains(topic))
assigner.next();
//未当前消费者添加分区信息
assignment.get(assigner.next()).add(partition);
}
return assignment;
}
通过上面的的案例我们是不是可以通过继承AbstractPartitionAssignor抽象类,实现它的assign方法,来自定义消费者分区分配策略,因为这里我们得到了一个所有相关主题和主题分区数量,所有主题对应的消费者,那么就可以在这里根据自己实际场景自定义一些分配策略。
欢迎点赞+收藏+转发朋友圈素质三连文章不错?点个【在看】吧! ?