专为小白打造—Kafka一篇文章入门
共 15463字,需浏览 31分钟
·
2023-10-28 20:12
Kafka 是MQ消息队列作为最常用的中间件之一,其主要特性有:解耦、异步、限流/削峰。
2.1 Topic与Partition
2.2 Broker与Partition
2.3 生产者消费者与ZooKeeper
2.4 消费者组
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少) 消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者 并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况, 就会有消费者分配不到任何分区。参考下图(右下),一共有 8 个消费者,7 个分区,那么最后的消费 者 C7 由于分配不到任何分区而无法消费任何消息。
2.5 ISR、HW、LEO
3.1 注册信息
3.2 创建主题
3.3 生产者发送数据
3.3.1 封装ProducerRecord
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
3.3.2 序列化数据、获取元数据、确定分区
-
如果ProducerRecord中指定了要发往那个分区,则选择用户使用的分区 -
如果没有指定分区,则查看ProducerRecord中key是否为空,如果不为空则对key进行计算以获取使用那个分区 -
如果key也为空,则按照轮询的方式发送至不同的分区
3.3.3 写入缓冲区、分批分送消息
首先客户端发送请求全部会先发送给一个Acceptor,broker里面会存在3个线程(默认是3个),这3个线程都是叫做processor,Acceptor不会对客户端的请求做任何的处理,直接封装成一个个socketChannel发送给这些processor形成一个队列,发送的方式是轮询,就是先给第一个processor发送,然后再给第二个,第三个,然后又回到第一个。消费者线程去消费这些socketChannel时,会获取一个个request请求,这些request请求中就会伴随着数据。
线程池里面默认有8个线程,这些线程是用来处理request的,解析请求,如果request是写请求,就写到磁盘里。读的话返回结果。processor会从response中读取响应数据,然后再返回给客户端。这就是Kafka的网络三层架构。
所以如果我们需要对kafka进行增强调优,增加processor并增加线程池里面的处理线程,就可以达到效果。request和response那一块部分其实就是起到了一个缓存的效果,是考虑到processor们生成请求太快,线程数不够不能及时处理的问题。
3.4 消费者消费数据
-
信息注册阶段,即整个消费者组向集群注册消费信息等 -
信息消费阶段,开始信息消息,确保消息可靠性等
3.4.1 信息注册
3.4.2 消费信息
1)拉取消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
int updateCount = 1;
if (map.containsKey(record.value())) {
updateCount = (int) map.get(record.value() + 1);
}
map.put(record.value(), updateCount);
}
}
}finally {
consumer.close();
}
2)反序列化与消费消息
3)提交消息位移
-
重复消费。例如从offset为21开始拉取数据,拉取到了40,但是当消费者处理到第30条数据的时候系统宕机了,那么此时已经提交的offset仍为21,当节点重新连接时,仍会从21消费,那么此时21-30的数据就会被重新消费。还有一种情况是再均衡时,例如有新节点加入也会引发类似的问题。 -
消息丢失。
public static void main(String[] args) {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
// 模拟消息的处理逻辑
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
try {
//处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
consumer.commitSync();
} catch (CommitFailedException e) {
//todo 事务回滚
e.printStackTrace();
}
}
}
public void asynCommit1(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync();
}
}
public void asynCommit2(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
// 异步回调机制
consumer.commitAsync(new OffsetCommitCallback(){
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception!=null){
System.out.println(String.format("提交失败:%s", offsets.toString()));
}
}
});
}
}
public void asynCommit3(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync((offsets, exception) ->{
if (exception!=null){
System.out.println(String.format("提交失败:%s", offsets.toString()));
}
});
}
}
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
log.trace("Kafka消费信息ConsumerRecord={}",record.toString());
}
try {
//先使用异步提交机制
consumer.commitAsync();
} catch (CommitFailedException e) {
// todo 补偿机制
log.error("commitAsync failed", e)
} finally{
try {
//再使用同步提交机制
consumer.commitSync();
} catch (CommitFailedException e) {
// todo 补偿机制
log.error("commitAsync failed", e)
} finally{
consumer.close();
}
}
}
4.1 异常重试
JMQ在消费过程中如果有未捕获的异常会认为消息消费失败,会首先在本地重试两次后放入重试队列中,进入重试队列的消息,会有过期逻辑,当超过重试时间或者超过最大重试次数后(默认3天过期),消息将会被丢弃。因此在处理消息时需要考虑如果出现异常后的处理场景,选择是重试还是忽略还是记录数据后告警。
//每隔1min拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60L));
for (ConsumerRecord<String, String> record : records) {
try {
//doing
} catch (Exception e) {
//如果此处未捕获消息,会直接导致for循环退出,后续所有消息都将丢失
log.error("Bdp监听任务执行失败, taskName:{}", taskName, e);
}
}
4.2 本地重试与服务端重试
重试分为本地重试和服务端重试
<jmq:consumer id="apiConsumer" transport="jmq.apilog.transport">
<!--配置间隔1秒,重试3次-->
<jmq:listener topic="${jmq.topic.apilog}" listener="apiLogMessageListener" retryDelay="1000" maxRetrys="3"/>
</jmq:consumer>
-end-