Kafka 怎么顺序消费?面试必备!

Java技术栈

共 3834字,需浏览 8分钟

 · 2022-03-23

点击关注公众号,Java干货及时送达

前言

本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题。

如存在Topic-insert和Topic-update分别是对数据的插入和更新,当insert和update操作为同一数据时,应保证先insert再update。

1、问题引入

kafka的顺序消费一直是一个难以解决的问题,kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证。

如果一个Topic只有一个Partition,那么这个Topic对应consumer的消费必然是有序的。不同的Topic的任何情况下都无法保证consumer的消费顺序和producer的发送顺序一致。

如果不同Topic之间存在数据关联且对消费顺序有要求,该如何处理?本文主要解决此问题。

另外,Kafka 系列面试题和答案全部整理好了,微信搜索Java技术栈,在后台发送:面试,可以在线阅读。

2、解决思路

现有Topic-insert和Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。想成为架构师,这份架构师图谱建议看看,少走弯路

两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。

使用synchronized进行加锁的话,会影响无关联的insert和update的数据消费能力,如id=1的insert和id=2的update,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。

细粒度锁实现:https://blog.csdn.net/qq_38245668/article/details/105891161

PS:如果为分布式系统,细粒度锁需要使用分布式锁的对应实现。

在对insert和update加锁之后,其实还是没有解决消费顺序的问题,只是确保了同一时间只有一个业务在处理。 对于消费顺序异常的问题,也就是先消费了update再消费insert的情况。

处理方式:消费到update数据,校验库中是否存在当前数据(也就是是否执行insert),如果没有,就将当前update数据存入缓存,key为数据标识id,在insert消费时检查是否存在id对应的update缓存,如果有,就证明当前数据的消费顺序异常,需执行update操作,再将缓存数据移除。

点击关注公众号,Java干货及时送达

3、实现方案

 UPDATE_DATA_MAP = new ConcurrentHashMap<>();    // 数据存储    private Map DATA_MAP = new ConcurrentHashMap<>();    private WeakRefHashLock weakRefHashLock;    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {        this.weakRefHashLock = weakRefHashLock;    }    @KafkaListener(topics = "TOPIC_INSERT")    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        // 模拟顺序异常,也就是insert后消费,这里线程sleep        Thread.sleep(1000);        String id = record.value();        log.info("接收到insert :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            log.info("开始处理 {} 的insert", id);            // 模拟 insert 业务处理            Thread.sleep(1000);            // 从缓存中获取 是否存在有update数据            if (UPDATE_DATA_MAP.containsKey(id)){                // 缓存数据存在,执行update                doUpdate(id);            }            log.info("处理 {} 的insert 结束", id);        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    @KafkaListener(topics = "TOPIC_UPDATE")    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        String id = record.value();        log.info("接收到update :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            // 测试使用,不做数据库的校验            if (!DATA_MAP.containsKey(id)){                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存                log.info("消费顺序异常,将update数据 {} 加入缓存", id);                UPDATE_DATA_MAP.put(id, id);            }else {                doUpdate(id);            }        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    void doUpdate(String id) throws InterruptedException{        // 模拟 update        log.info("开始处理update::{}", id);        Thread.sleep(1000);        log.info("处理update::{} 结束", id);    }}日志(代码中已模拟必现消费顺序异常的场景):接收到update ::1消费顺序异常,将update数据 1 加入缓存接收到insert ::1开始处理 1 的insert开始处理update::1处理update::1 结束处理 1 的insert 结束观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">消息发送:

 UPDATE_DATA_MAP = new ConcurrentHashMap<>();    // 数据存储    private Map DATA_MAP = new ConcurrentHashMap<>();    private WeakRefHashLock weakRefHashLock;    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {        this.weakRefHashLock = weakRefHashLock;    }    @KafkaListener(topics = "TOPIC_INSERT")    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        // 模拟顺序异常,也就是insert后消费,这里线程sleep        Thread.sleep(1000);        String id = record.value();        log.info("接收到insert :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            log.info("开始处理 {} 的insert", id);            // 模拟 insert 业务处理            Thread.sleep(1000);            // 从缓存中获取 是否存在有update数据            if (UPDATE_DATA_MAP.containsKey(id)){                // 缓存数据存在,执行update                doUpdate(id);            }            log.info("处理 {} 的insert 结束", id);        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    @KafkaListener(topics = "TOPIC_UPDATE")    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        String id = record.value();        log.info("接收到update :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            // 测试使用,不做数据库的校验            if (!DATA_MAP.containsKey(id)){                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存                log.info("消费顺序异常,将update数据 {} 加入缓存", id);                UPDATE_DATA_MAP.put(id, id);            }else {                doUpdate(id);            }        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    void doUpdate(String id) throws InterruptedException{        // 模拟 update        log.info("开始处理update::{}", id);        Thread.sleep(1000);        log.info("处理update::{} 结束", id);    }}日志(代码中已模拟必现消费顺序异常的场景):接收到update ::1消费顺序异常,将update数据 1 加入缓存接收到insert ::1开始处理 1 的insert开始处理update::1处理update::1 结束处理 1 的insert 结束观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">kafkaTemplate.send("TOPIC_INSERT""1");
kafkaTemplate.send("TOPIC_UPDATE""1");

最新 Kafka 面试题整理好了,大家可以在Java面试库小程序在线刷题。

监听代码示例:

 UPDATE_DATA_MAP = new ConcurrentHashMap<>();    // 数据存储    private Map DATA_MAP = new ConcurrentHashMap<>();    private WeakRefHashLock weakRefHashLock;    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {        this.weakRefHashLock = weakRefHashLock;    }    @KafkaListener(topics = "TOPIC_INSERT")    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        // 模拟顺序异常,也就是insert后消费,这里线程sleep        Thread.sleep(1000);        String id = record.value();        log.info("接收到insert :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            log.info("开始处理 {} 的insert", id);            // 模拟 insert 业务处理            Thread.sleep(1000);            // 从缓存中获取 是否存在有update数据            if (UPDATE_DATA_MAP.containsKey(id)){                // 缓存数据存在,执行update                doUpdate(id);            }            log.info("处理 {} 的insert 结束", id);        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    @KafkaListener(topics = "TOPIC_UPDATE")    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        String id = record.value();        log.info("接收到update :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            // 测试使用,不做数据库的校验            if (!DATA_MAP.containsKey(id)){                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存                log.info("消费顺序异常,将update数据 {} 加入缓存", id);                UPDATE_DATA_MAP.put(id, id);            }else {                doUpdate(id);            }        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    void doUpdate(String id) throws InterruptedException{        // 模拟 update        log.info("开始处理update::{}", id);        Thread.sleep(1000);        log.info("处理update::{} 结束", id);    }}日志(代码中已模拟必现消费顺序异常的场景):接收到update ::1消费顺序异常,将update数据 1 加入缓存接收到insert ::1开始处理 1 的insert开始处理update::1处理update::1 结束处理 1 的insert 结束观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">KafkaListenerDemo.java

 UPDATE_DATA_MAP = new ConcurrentHashMap<>();    // 数据存储    private Map DATA_MAP = new ConcurrentHashMap<>();    private WeakRefHashLock weakRefHashLock;    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {        this.weakRefHashLock = weakRefHashLock;    }    @KafkaListener(topics = "TOPIC_INSERT")    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        // 模拟顺序异常,也就是insert后消费,这里线程sleep        Thread.sleep(1000);        String id = record.value();        log.info("接收到insert :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            log.info("开始处理 {} 的insert", id);            // 模拟 insert 业务处理            Thread.sleep(1000);            // 从缓存中获取 是否存在有update数据            if (UPDATE_DATA_MAP.containsKey(id)){                // 缓存数据存在,执行update                doUpdate(id);            }            log.info("处理 {} 的insert 结束", id);        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    @KafkaListener(topics = "TOPIC_UPDATE")    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        String id = record.value();        log.info("接收到update :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            // 测试使用,不做数据库的校验            if (!DATA_MAP.containsKey(id)){                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存                log.info("消费顺序异常,将update数据 {} 加入缓存", id);                UPDATE_DATA_MAP.put(id, id);            }else {                doUpdate(id);            }        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    void doUpdate(String id) throws InterruptedException{        // 模拟 update        log.info("开始处理update::{}", id);        Thread.sleep(1000);        log.info("处理update::{} 结束", id);    }}日志(代码中已模拟必现消费顺序异常的场景):接收到update ::1消费顺序异常,将update数据 1 加入缓存接收到insert ::1开始处理 1 的insert开始处理update::1处理update::1 结束处理 1 的insert 结束观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">@Component
@Slf4j
public class KafkaListenerDemo {

    // 消费到的数据缓存
    private Map UPDATE_DATA_MAP = new ConcurrentHashMap<>();
    // 数据存储
    private Map DATA_MAP = new ConcurrentHashMap<>();
    private WeakRefHashLock weakRefHashLock;

    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
        this.weakRefHashLock = weakRefHashLock;
    }

    @KafkaListener(topics = "TOPIC_INSERT")
    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{
        // 模拟顺序异常,也就是insert后消费,这里线程sleep
        Thread.sleep(1000);

        String id = record.value();
        log.info("接收到insert :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            log.info("开始处理 {} 的insert", id);
            // 模拟 insert 业务处理
            Thread.sleep(1000);
            // 从缓存中获取 是否存在有update数据
            if (UPDATE_DATA_MAP.containsKey(id)){
                // 缓存数据存在,执行update
                doUpdate(id);
            }
            log.info("处理 {} 的insert 结束", id);
        }finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    @KafkaListener(topics = "TOPIC_UPDATE")
    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{

        String id = record.value();
        log.info("接收到update :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            // 测试使用,不做数据库的校验
            if (!DATA_MAP.containsKey(id)){
                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存
                log.info("消费顺序异常,将update数据 {} 加入缓存", id);
                UPDATE_DATA_MAP.put(id, id);
            }else {
                doUpdate(id);
            }
        }finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    void doUpdate(String id) throws InterruptedException{
        // 模拟 update
        log.info("开始处理update::{}", id);
        Thread.sleep(1000);
        log.info("处理update::{} 结束", id);
    }

}

 UPDATE_DATA_MAP = new ConcurrentHashMap<>();    // 数据存储    private Map DATA_MAP = new ConcurrentHashMap<>();    private WeakRefHashLock weakRefHashLock;    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {        this.weakRefHashLock = weakRefHashLock;    }    @KafkaListener(topics = "TOPIC_INSERT")    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        // 模拟顺序异常,也就是insert后消费,这里线程sleep        Thread.sleep(1000);        String id = record.value();        log.info("接收到insert :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            log.info("开始处理 {} 的insert", id);            // 模拟 insert 业务处理            Thread.sleep(1000);            // 从缓存中获取 是否存在有update数据            if (UPDATE_DATA_MAP.containsKey(id)){                // 缓存数据存在,执行update                doUpdate(id);            }            log.info("处理 {} 的insert 结束", id);        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    @KafkaListener(topics = "TOPIC_UPDATE")    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        String id = record.value();        log.info("接收到update :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            // 测试使用,不做数据库的校验            if (!DATA_MAP.containsKey(id)){                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存                log.info("消费顺序异常,将update数据 {} 加入缓存", id);                UPDATE_DATA_MAP.put(id, id);            }else {                doUpdate(id);            }        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    void doUpdate(String id) throws InterruptedException{        // 模拟 update        log.info("开始处理update::{}", id);        Thread.sleep(1000);        log.info("处理update::{} 结束", id);    }}日志(代码中已模拟必现消费顺序异常的场景):接收到update ::1消费顺序异常,将update数据 1 加入缓存接收到insert ::1开始处理 1 的insert开始处理update::1处理update::1 结束处理 1 的insert 结束观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">日志(代码中已模拟必现消费顺序异常的场景):

 UPDATE_DATA_MAP = new ConcurrentHashMap<>();    // 数据存储    private Map DATA_MAP = new ConcurrentHashMap<>();    private WeakRefHashLock weakRefHashLock;    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {        this.weakRefHashLock = weakRefHashLock;    }    @KafkaListener(topics = "TOPIC_INSERT")    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        // 模拟顺序异常,也就是insert后消费,这里线程sleep        Thread.sleep(1000);        String id = record.value();        log.info("接收到insert :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            log.info("开始处理 {} 的insert", id);            // 模拟 insert 业务处理            Thread.sleep(1000);            // 从缓存中获取 是否存在有update数据            if (UPDATE_DATA_MAP.containsKey(id)){                // 缓存数据存在,执行update                doUpdate(id);            }            log.info("处理 {} 的insert 结束", id);        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    @KafkaListener(topics = "TOPIC_UPDATE")    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        String id = record.value();        log.info("接收到update :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            // 测试使用,不做数据库的校验            if (!DATA_MAP.containsKey(id)){                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存                log.info("消费顺序异常,将update数据 {} 加入缓存", id);                UPDATE_DATA_MAP.put(id, id);            }else {                doUpdate(id);            }        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    void doUpdate(String id) throws InterruptedException{        // 模拟 update        log.info("开始处理update::{}", id);        Thread.sleep(1000);        log.info("处理update::{} 结束", id);    }}日志(代码中已模拟必现消费顺序异常的场景):接收到update ::1消费顺序异常,将update数据 1 加入缓存接收到insert ::1开始处理 1 的insert开始处理update::1处理update::1 结束处理 1 的insert 结束观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">接收到update ::1
消费顺序异常,将update数据 1 加入缓存
接收到insert ::1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束

 UPDATE_DATA_MAP = new ConcurrentHashMap<>();    // 数据存储    private Map DATA_MAP = new ConcurrentHashMap<>();    private WeakRefHashLock weakRefHashLock;    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {        this.weakRefHashLock = weakRefHashLock;    }    @KafkaListener(topics = "TOPIC_INSERT")    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        // 模拟顺序异常,也就是insert后消费,这里线程sleep        Thread.sleep(1000);        String id = record.value();        log.info("接收到insert :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            log.info("开始处理 {} 的insert", id);            // 模拟 insert 业务处理            Thread.sleep(1000);            // 从缓存中获取 是否存在有update数据            if (UPDATE_DATA_MAP.containsKey(id)){                // 缓存数据存在,执行update                doUpdate(id);            }            log.info("处理 {} 的insert 结束", id);        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    @KafkaListener(topics = "TOPIC_UPDATE")    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException{        String id = record.value();        log.info("接收到update :: {}", id);        Lock lock = weakRefHashLock.lock(id);        lock.lock();        try {            // 测试使用,不做数据库的校验            if (!DATA_MAP.containsKey(id)){                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存                log.info("消费顺序异常,将update数据 {} 加入缓存", id);                UPDATE_DATA_MAP.put(id, id);            }else {                doUpdate(id);            }        }finally {            lock.unlock();        }        acknowledgment.acknowledge();    }    void doUpdate(String id) throws InterruptedException{        // 模拟 update        log.info("开始处理update::{}", id);        Thread.sleep(1000);        log.info("处理update::{} 结束", id);    }}日志(代码中已模拟必现消费顺序异常的场景):接收到update ::1消费顺序异常,将update数据 1 加入缓存接收到insert ::1开始处理 1 的insert开始处理update::1处理update::1 结束处理 1 的insert 结束观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" style="color: rgb(58, 58, 58);" data-linktype="2">观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。

版权声明:本文为CSDN博主「方片龙」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。原文链接:https://blog.csdn.net/qq_38245668/article/details/105900011









Spring Cloud 爆高危漏洞,赶紧修复!
2021 年发生的 10 件技术大事!!
23 种设计模式实战(很全)
Spring Boot 保护敏感配置的 4 种方法!
再见单身狗!Java 创建对象的 6 种方式
阿里为什么推荐使用 LongAdder?
重磅官宣:Redis 对象映射框架来了!!
别再写爆爆爆炸类了,试试装饰器模式!
程序员精通各种技术体系,45岁求职难!
Spring Boot 3.0 M1 发布,正式弃用 Java 8
Spring Boot 学习笔记,这个太全了!



关注Java技术栈看更多干货



获取 Spring Boot 实战笔记!
浏览 23
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报