如何解决 Kafka 消息重复问题!

小哈学Java

共 10304字,需浏览 21分钟

 ·

2023-01-29 10:50

一、前言

数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。

cb9ae3b09cb6d141e7e9e97af9e83f27.webp

通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。

整理下消息重复的几个场景:

1.「生产端:」 遇到异常,基本解决措施都是 「重试」

  • 场景一:leader分区不可用了,抛 LeaderNotAvailableException 异常,等待选出新 leader 分区。

  • 场景二:Controller 所在 Broker 挂了,抛 NotControllerException 异常,等待 Controller 重新选举。

  • 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException 异常,等待网络恢复。

2.「消费端:」 poll 一批数据,处理完毕还没提交 offset ,机子宕机重启了,又会 poll 上批数据,再度消费就造成了消息重复。

怎么解决?

「先来了解下消息的三种投递语义:」

  • 最多一次(at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如:mqtt 中 QoS = 0

  • 至少一次(at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如:mqtt 中 QoS = 1

  • 精确一次(exactly once): 消息精确发一次,消息不会丢失,也不会被重复发送。例如:mqtt 中 QoS = 2

了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:

  1. Kafka 幂等性 Producer: 保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)

  2. Kafka 事务: 保证生产端发送消息幂等。解决幂等 Producer 的局限性。

  3. 消费端幂等: 保证消费端接收消息幂等。蔸底方案。

1)Kafka 幂等性 Producer

「幂等性指」:无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。

「幂等性使用示例:在生产端添加对应配置即可」

      
      Properties props = new Properties();  
props.put("enable.idempotence", ture); // 1. 设置幂等  
props.put("acks""all"); // 2. 当 enable.idempotence 为 true,这里默认为 all  
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意

1.设置幂等,启动幂等。

2.配置 acks,注意:一定要设置 acks=all,否则会抛异常。

3.配置max.in.flight.requests.per.connection 需要 <= 5,否则会抛异常 OutOfOrderSequenceException

  • 0.11 >= Kafka < 1.1max.in.flight.request.per.connection = 1

  • Kafka >= 1.1max.in.flight.request.per.connection <= 5

为了更好理解,需要了解下Kafka 幂等机制:

8f0a57764184f8f2bc0052b677afadaf.webp

1.Producer 每次启动后,会向 Broker 申请一个全局唯一的 pid。(重启后 pid 会变化,这也是弊端之一)

2.Sequence Numbe:针对每个 <Topic, Partition> 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个 seq num

3.判断是否重复: 拿 <pid, seq num> 去 Broker 里对应的队列 ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在

  • 如果 nextSeq == lastSeq + 1,即 服务端seq + 1 == 生产传入seq,则接收。

  • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。

  • 反之,要么重复,要么丢消息,均拒绝。

77e822cfc289eb00e4f25f4cd5bea8ba.webp

这种设计针对解决了两个问题:

  1. 「消息重复:」 场景 Broker 保存消息后还没发送 ack 就宕机了,这时候 Producer 就会重试,这就造成消息重复。

  2. 「消息乱序:」 避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。

那什么时候该使用幂等:

  1. 如果已经使用 acks=all,使用幂等也可以。

  2. 如果已经使用 acks=0 或者 acks=1,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。

2)Kafka 事务

使用 Kafka 事务解决幂等的弊端:单会话且单分区幂等。

Tips:」 这块篇幅较长,这先稍微提及下使用,之后另起一篇。

「事务使用示例:分为生产端 和 消费端」

      
      Properties props = new Properties();  
props.put("enable.idempotence", ture); // 1. 设置幂等  
props.put("acks""all"); // 2. 当 enable.idempotence 为 true,这里默认为 all  
props.put("max.in.flight.requests.per.connection"5); // 3. 最大等待数  
props.put("transactional.id""my-transactional-id"); // 4. 设定事务 id  
  
Producer<String, String> producer = new KafkaProducer<String, String>(props);  
  
// 初始化事务  
producer.initTransactions();  
  
try{  
    // 开始事务  
    producer.beginTransaction();  
  
    // 发送数据  
    producer.send(new ProducerRecord<String, String>("Topic""Key""Value"));  
   
    // 数据发送及 Offset 发送均成功的情况下,提交事务  
    producer.commitTransaction();  
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {  
    // 数据发送或者 Offset 发送出现异常时,终止事务  
    producer.abortTransaction();  
finally {  
    // 关闭 Producer 和 Consumer  
    producer.close();  
    consumer.close();  
}

这里消费端Consumer 需要设置下配置:isolation.level 参数

  • read_uncommitted:」 这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。

  • read_committed:」 表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

3)消费端幂等

“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。

只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。

「典型的方案是使用:消息表,来去重:」

0470bcb56641d02e36e03ac595cf419c.webp
  • 上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id 新增到本地消息表中,同时更新订单信息。

  • 如果消息重复,则新增操作 insert 会异常,同时触发事务回滚。

二、案例:Kafka 幂等性 Producer 使用

环境搭建可参考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic

「准备工作如下:」

1、Zookeeper:本地使用 Docker 启动

      
      $ docker run -d --name zookeeper -p 2181:2181 zookeeper  
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

2、Kafka:版本 2.7.1,源码编译启动(看上文源码搭建启动)

3、启动生产者:Kafka 源码中 exmaple 中

4、启动消息者:可以用 Kafka 提供的脚本

      
      # 举个栗子:topic 需要自己去修改  
cd ./kafka-2.7.1-src/bin  
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

创建topic :1副本,2 分区

      
      $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2  
  
# 查看  
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

「生产者代码:」

0a0ac5659c216ca65d30fd70d73daa7b.webp
      
      public class KafkaProducerApplication {  
  
    private final Producer<String, String> producer;  
    final String outTopic;  
  
    public KafkaProducerApplication(final Producer<String, String> producer,  
                                    final String topic)
 
{  
        this.producer = producer;  
        outTopic = topic;  
    }  
  
    public void produce(final String message) {  
        final String[] parts = message.split("-");  
        final String key, value;  
        if (parts.length > 1) {  
            key = parts[0];  
            value = parts[1];  
        } else {  
            key = null;  
            value = parts[0];  
        }  
        final ProducerRecord<String, String> producerRecord  
            = new ProducerRecord<>(outTopic, key, value);  
        producer.send(producerRecord,  
                (recordMetadata, e) -> {  
                    if(e != null) {  
                        e.printStackTrace();  
                    } else {  
                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());  
                    }  
                }  
        );  
    }  
  
    public void shutdown() {  
        producer.close();  
    }  
  
    public static void main(String[] args) {  
  
        final Properties props = new Properties();  
  
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");  
        props.put(ProducerConfig.ACKS_CONFIG, "all");  
  
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");  
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  
  
        final String topic = "myTopic";  
        final Producer<String, String> producer = new KafkaProducer<>(props);  
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);  
  
        String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";  
        try {  
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));  
            linesToProduce.stream().filter(l -> !l.trim().isEmpty())  
                    .forEach(producerApp::produce);  
            System.out.println("Offsets and timestamps committed in batch from " + filePath);  
        } catch (IOException e) {  
            System.err.printf("Error reading file %s due to %s %n", filePath, e);  
        } finally {  
            producerApp.shutdown();  
        }  
    }  
}

「启动生产者后,控制台输出如下:」

8706719777f98bbf185be1dcc3d539e5.webp

「启动消费者:」

      
      $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
84d0842ea24b2461ddce494766663a45.webp

修改配置 acks

启用幂等的情况下,调整acks 配置,生产者启动后结果是怎样的:

  • 修改配置 acks = 1

  • 修改配置 acks = 0

会直接报错:

      
      Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.  
Otherwise we cannot guarantee idempotence.
ceecc917126beaf21fb36a17d6756fdf.webp

修改配置 max.in.flight.requests.per.connection

「启用幂等的情况下,调整此配置,结果是怎样的:」

将  max.in.flight.requests.per.connection > 5 会怎样?

053b57e910b7e437f200a8eb642d3df0.webp

「当然会报错:」

      
      Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.
314d47ad6900d09c690a312730deaf6f.webp

作者:格格步入

来源:juejin.cn/post/7172897190627508237

    
      
        
          

1. 负载均衡 LVS vs Nginx 对比!还傻傻分不清?

2. 一款强大的开源认证和访问控制利器:KeyCloak 太牛了!

3. MySQL 批量操作,一次插入多少行数据效率最高?

4. 美团动态线程池,香啊!

            

最近面试BAT,整理一份面试资料 Java面试BATJ通关手册 ,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。

获取方式:点“ 在看 ”,关注公众号并回复  Java  领取,更多内容陆续奉上。

            

PS:因公众号平台更改了推送规则,如果不想错过内容,记得读完点一下 在看 ,加个 星标 ,这样每次新文章推送才会第一时间出现在你的订阅列表里。

“在看”支持小哈呀,谢谢啦

浏览 80
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报