实时数据仓库必备技术:Kafka 知识梳理
- 为什么使用消息队列? -
解耦
异步
削峰
(1) 解耦
引入消息队列后,系统A产生的数据直接发送到消息队列中,哪个系统需要系统A的数据就直接去消息队列中消费,这样系统A就和其他系统彻底解耦了。
(2) 异步
客户端调用A系统的一个接口处理某个功能,该功能需要调用B,C,D系统进行处理,如果A系统自身耗时为20ms,B,C,D系统耗时分别是300ms,450ms,200ms,最终接口返回时总共耗时970ms,这肯定是不可接受的,我们需要引入消息队列。
引入消息队列后,系统A将消息发送到消息队列中就可以直接返回,接口总共耗时很短,用户体验非常棒。
(3) 削峰
在高并发场景下(比如秒杀活动)某一刻的并发量会非常高,如果这些请求全部到达MySQL,会导致MySQL崩溃,这时我们需要引入消息队列,先将请求积压到消息队列中,让MySQL正常处理。
- 消息队列有什么优缺点 -
(1) 优点
解耦
异步
削峰
(2) 缺点
系统可用性降低,MQ一旦挂掉,整个系统就崩溃了。
系统复杂度提高,引入MQ后需要考虑一系列问题,比如消息丢失,重复消费,消息消费的顺序等等。
一致性问题,没有引入MQ之前有事务来保证一致性,引入MQ后如果某一步执行失败,这就导致数据不一致了。
ActiveMQ、RabbitMQ、Kafka、RocketMQ 优点和缺点
(1) ActiveMQ和RabbitMQ单击吞吐量是万级,Kafka和RocketMQ的单机吞吐量是10万级。
(2) 四种MQ的时效性,可用性,消息可靠性都很高。
(3) ActiveMQ的社区不太活跃,其他三种MQ的社区比较活跃。
(4) RabbitMQ是基于Erlang语言开发,对Java开发者不太友好。
(5) Kafka当topic数量达到1000时吞吐量会大幅度下降,而RocketMQ影响不太(这是RocketMQ相对于Kafka的一大优势)。
(6) Kafka的功能简单,吞吐量高,天然适合大数据实时计算以及日志采集。
- 如何保证消息队列的高可用 -
回答自己熟悉的消息队列,如Kafka。
Kafka是一个分布式的消息队列,一个topic有多个partition,每个partition分布在不同的节点上。此外,Kafka还可以为partition配置副本机制,一个主副本对外提供服务,多个从副本提供冷备功能(即只起备份作用,不提供读写)。
(1) 从副本为什么不提供读写服务,只做备份?
因为如果follower副本也提供写服务的话,那么就需要在所有的副本之间相互同步。n个副本就需要 n x n 条通路来同步数据,如果采用异步同步的话,数据的一致性和有序性是很难保证的,而采用同步方式进行数据同步的话,那么写入延迟其实是放大n倍的,反而适得其反。
(2) 从服务为什么不提供读服务呢?
这个除了因为同步延迟带来的数据不一致之外,不同于其他的存储服务(如ES,MySQL),Kafka的读取本质上是一个有序的消息消费,消费进度是依赖于一个叫做offset的偏移量,这个偏移量是要保存起来的。如果多个副本进行读负载均衡,那么这个偏移量就不好确定了。
总结一下,从副本不提供读写服务的原因就是很难保证数据的一致性与有序性,而且也没必要提供读写服务,Kafka是一个消息队列,副本的作用是保证消息不丢失。
partition主从副本数据同步
生产者发布消息到某个分区时,先通过ZooKeeper找到该分区的leader副本,然后将消息只发送给leader副本,leader副本收到消息后将其写入本地磁盘。接着每个follower副本都从leader副本上pull消息,follower副本收到消息后会向leader副本发送ACK(acknowledge)。一旦leader副本收到了 ISR (in-sync replicas) 中的所有副本的ACK,该消息就被认为已经commit了,然后leader副本向生产者发送ACK。消费者读消息只会从leader副本中读取,只有被commit过的消息才会暴露给消费者。
ISR(in-sync replicas)是与leader副本保持同步状态的follower副本列表,如果一段时间内(replica。lag。time。max。ms) leader副本没有收到follower副本的拉取请求,就会被leader副本从ISR中移除。ISR中的副本数必须大于等于 min。insync。replicas,否则producer会认为写入失败,进行消息重发。
主副本选举
当leader副本挂掉后,集群控制器(即Master节点)会从ISR中选出一个新的主副本(ISR中的第一个,不行就依次类推 )。
集群控制器选举
集群中的第一个broker通过在Zookeeper的 /controller 路径下创建一个临时节点来成为控制器,当其他broker启动时,也会试图创建一个临时节点,但是会收到“节点已存在”的异常,这样便知道集群控制器已存在。这些broker会监听Zookeeper的这个控制器临时节点,当控制器发生故障时,该临时节点会消失,这些broker便会收到通知,然后尝试去创建临时节点成为新的控制器。
如何保证消息不被重复消费(如何保证消息消费时的幂等性)?
(1) 导致消息重复消费的原因?分区重平衡消费者重启或宕机这两个原因都会导致消费者在消费消息后没有提交offset。
(2) 解决办法这个问题只能通过业务手段来解决,比如我们在消费前先查询数据库,判断是否已消费(status = 1),或消费后在Redis中做个记录,下次消费前先从Redis中判断是否已消费。
如果保证消息不丢失(如何保证消息的可靠性传输)?
(1) 导致消息丢失的原因?
kafka没有保存消息。消费者还没消费就提交了offset,然后消费者重启或宕机,分区重平衡。
(2) 解决办法
配置partition副本机制。
•default。replication。factor 每个分区的副本数必须大于1。
•min。insync。replicas 与主副本保存同步状态的从副本数必须大于等于1。
•Producer端的配置acks=all,指数据写入min。insync。replicas个从副本后才算写入成功。
•Producer端的配置retries=MAX(一个很大的值,表示无线重试的意思),指数据一旦写入失败,就无限重试。关闭自动提交offset,改为手动提交。先消费,消费成功后再手动提交offset。
- 如何保证消息的顺序性? -
kafka只保证单个分区内的消息有序,所以要想保证消息的顺序性,只能一个topic,一个partition,一个consumer。
如果在consumer端开多个线程来进行消费,如何保证消息的顺序性?
一个topic,一个partition,一个consumer,consumer内部单线程消费,写N个内存queue,然后开N个线程分别消费一个内存queue中的消息。
消息队列快写满了怎么办?
一般出现这种问题的原因就是消费端出了故障,导致无法消费或消费极慢,这时有两种解决办法,根据不同的场景选择不同的解决办法。
(1) 紧急扩容临时征用10倍的机器来部署consumer,新建一个topic,partition是原来的10倍。写一个临时分发数据的consumer程序,将积压的数据不做处理,直接分发给临时建好的topic。以10倍的速度消费积压的消息,消费完之后再恢复原来的部署。
(2) 批量重导写一个临时分发数据的consumer程序,将积压的数据直接丢弃。等高峰期过后,写个临时程序,将丢失的那批数据重新导入消息队列中。
如果让你自己写一个消息队列,该如何进行架构设计?
我们可以用Kafka的架构设计来回答这个问题。
(1) 分布式这个消息队列必须分布式的,这样通过水平扩展集群就可以增加消息队列的吞吐量与容量。分布式的消息队列必须要有一个master节点来管理整个集群,可以通过Zookeeper来实现master节点选举算法。
(2) 可用性一个topic必须支持多个partition,且partition数量可以增加,每个partition分布在不同的节点上。partition内通过offset来保证消息的顺序。同时为了保证可用性,每个分区必须设置副本,主副本提供读写服务,从副本只作备份即可。当主副本所在的节点宕机后,master节点会在从副本中选出一个作为主副本,然后当宕机的节点修复后,master节点会将缺失的副本分配过去,同步数据后,集群恢复正常。
(3) 高性能为了保证高吞吐量,我们可以使用批量压缩,顺序写,零拷贝技术。
(4) 解决消息丢失方案消息必须写入所有副本中才算写入成功。
- Kafka 为什么速度那么快? -
我们都知道Kafka的核心特性之一就是高吞吐率,但Kafka的数据是存储在磁盘上的,一般认为在磁盘上读写数据性能很低,那Kafka是如何做到高吞吐率的呢?
批量压缩
顺序写
零拷贝
Kafka高吞吐率的秘诀在于,它把所有的消息都进行批量压缩,提升网络IO,通过顺序写和零拷贝技术提升磁盘IO。
作者:椰子Tyshawn
来源:
https://blog.csdn.net/litianxiang_kaola/article/details/104138183