Kafka又出问题了!
01
写在前面
02
问题重现
2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] -
commit failed
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
03
分析问题
既然Kafka触发了Rebalance机制,那我就来说说Kafka触发Rebalance的时机。
什么是Rebalance
触发Rebalance的时机
组内成员的个数发生了变化,比如有新的消费者加入消费组,或者离开消费组。组成员离开消费组包含组成员崩溃或者主动离开消费组。 订阅的主题个数发生了变化。 订阅的主题分区数发生了变化。
session.timeout.ms
进行配置。默认值是 10 秒。heartbeat.interval.ms
。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED
标志封装进心跳请求的响应体中。max.poll.interval.ms
参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮 Rebalance。设置 session.timeout.ms = 6s。 设置 heartbeat.interval.ms = 2s。 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms
。
max.poll.interval.ms
参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,最好将该参数值设置得大一点,比下游最大处理时间稍长一点。拉取偏移量与提交偏移量
kafka
的偏移量(offset
)是由消费者进行管理的,偏移量有两种,拉取偏移量
(position)与提交偏移量
(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka
会使用拉取偏移量
的值作为分区的提交偏移量
发送给协调者。异常日志提示的方案
max.poll.interval.ms
时长和 session.timeout.ms
时长,减少 max.poll.records
的配置值,并且消费端在处理完消息时要及时提交偏移量。04
问题解决
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value());
try {
Object value = record.value();
logger.info(value.toString());
ack.acknowledge();
} catch (Exception e) {
logger.error("日志消费端异常: {}", e);
}
}
尝试解决
spring:
kafka:
consumer:
properties:
max.poll.interval.ms: 3600000
max.poll.records: 50
session.timeout.ms: 60000
heartbeat.interval.ms: 3000
最终解决
session.timeout.ms: 60000
,根本不起作用,还是抛出Rebalance
异常。@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
一键三连「分享」、「点赞」和「在看」
技术干货与你天天见~
评论