面试必备!Kafka 怎么顺序消费?
前言
本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题。
1、问题引入
2、解决思路
现有Topic-insert和Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。想成为架构师,这份架构师图谱建议看看,少走弯路。
两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。
使用synchronized进行加锁的话,会影响无关联的insert和update的数据消费能力,如id=1的insert和id=2的update,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。
细粒度锁实现:https://blog.csdn.net/qq_38245668/article/details/105891161
PS:如果为分布式系统,细粒度锁需要使用分布式锁的对应实现。
3、实现方案
kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");
监听代码示例:
record, Acknowledgment acknowledgment) throws InterruptedException{ // 模拟顺序异常,也就是insert后消费,这里线程sleep Thread.sleep(1000); String id = record.value(); log.info("接收到insert :: {}", id); Lock lock = weakRefHashLock.lock(id); lock.lock(); try { log.info("开始处理 {} 的insert", id); // 模拟 insert 业务处理 Thread.sleep(1000); // 从缓存中获取 是否存在有update数据 if (UPDATE_DATA_MAP.containsKey(id)){ // 缓存数据存在,执行update doUpdate(id); } log.info("处理 {} 的insert 结束", id); }finally { lock.unlock(); } acknowledgment.acknowledge(); } @KafkaListener(topics = "TOPIC_UPDATE") public void update(ConsumerRecordrecord, Acknowledgment acknowledgment) throws InterruptedException{ String id = record.value(); log.info("接收到update :: {}", id); Lock lock = weakRefHashLock.lock(id); lock.lock(); try { // 测试使用,不做数据库的校验 if (!DATA_MAP.containsKey(id)){ // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存 log.info("消费顺序异常,将update数据 {} 加入缓存", id); UPDATE_DATA_MAP.put(id, id); }else { doUpdate(id); } }finally { lock.unlock(); } acknowledgment.acknowledge(); } void doUpdate(String id) throws InterruptedException{ // 模拟 update log.info("开始处理update::{}", id); Thread.sleep(1000); log.info("处理update::{} 结束", id); }}" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2" wah-hotarea="click" hasload="1" style="outline: 0px; -webkit-tap-highlight-color: rgba(0, 0, 0, 0); cursor: pointer; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;"> @Component
@Slf4j
public class KafkaListenerDemo {
// 消费到的数据缓存
private MapUPDATE_DATA_MAP = new ConcurrentHashMap<>();
// 数据存储
private MapDATA_MAP = new ConcurrentHashMap<>();
private WeakRefHashLock weakRefHashLock;
public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
this.weakRefHashLock = weakRefHashLock;
}
@KafkaListener(topics = "TOPIC_INSERT")
public void insert(ConsumerRecordrecord, Acknowledgment acknowledgment) throws InterruptedException{
// 模拟顺序异常,也就是insert后消费,这里线程sleep
Thread.sleep(1000);
String id = record.value();
log.info("接收到insert :: {}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
log.info("开始处理 {} 的insert", id);
// 模拟 insert 业务处理
Thread.sleep(1000);
// 从缓存中获取 是否存在有update数据
if (UPDATE_DATA_MAP.containsKey(id)){
// 缓存数据存在,执行update
doUpdate(id);
}
log.info("处理 {} 的insert 结束", id);
}finally {
lock.unlock();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = "TOPIC_UPDATE")
public void update(ConsumerRecordrecord, Acknowledgment acknowledgment) throws InterruptedException{
String id = record.value();
log.info("接收到update :: {}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
// 测试使用,不做数据库的校验
if (!DATA_MAP.containsKey(id)){
// 未找到对应数据,证明消费顺序异常,将当前数据加入缓存
log.info("消费顺序异常,将update数据 {} 加入缓存", id);
UPDATE_DATA_MAP.put(id, id);
}else {
doUpdate(id);
}
}finally {
lock.unlock();
}
acknowledgment.acknowledge();
}
void doUpdate(String id) throws InterruptedException{
// 模拟 update
log.info("开始处理update::{}", id);
Thread.sleep(1000);
log.info("处理update::{} 结束", id);
}
}
接收到update ::1
消费顺序异常,将update数据 1 加入缓存
接收到insert ::1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束
观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。
版权声明:本文为CSDN博主「方片龙」的原创文章,原文链接:https://blog.csdn.net/qq_38245668/article/details/105900011
-End-
全栈架构社区交流群
「全栈架构社区」建立了读者架构师交流群,大家可以添加小编微信进行加群。欢迎有想法、乐于分享的朋友们一起交流学习。
评论