(二)Kafka生产优化之集群重要参数配置详解与优化设置
Kafka集群所有重要参数配置详解与优化设置
成文时间:2022年2月22日。参数如有变化,请以官网参数和默认值为准!
回顾上一篇的内容:(一)Kafka生产优化之如何全面规划 Kafka 的线上部署方案
今天为大家带来Kafka生产优化系列的第二篇 —— Kafka集群所有重要参数配置详解与优化设置。
飞哥已将Kafka集群中所有重要参数全部分类总结,并具体讲解每个参数的含义。
如果各位同学在面试或者生产中如果遇到Kafka性能优化方面的问题,可以参考此文配置进行具体优化。
由大海哥主讲的更详细的《Kafka3.0入门、源码、优化与配置》系列课程,可以B站搜索“尚硅谷”进行了解!也可复制以下链接到浏览器直接观看:【尚硅谷】2022版Kafka3.x教程(从入门到调优,深入全面)- https://b23.tv/vpUm1pm
Broker端配置详解
☆必配参数
官网提示:以下三个参数必配:
参数名 | 备注 | 默认值 |
---|---|---|
broker.id | 此参数必配,每台服务器都不相同且唯一。 | -1 |
log.dirs | 此参数必配,而且建议多目录。而且最好多个目录分属于不同的物理磁盘。 好处1:提高读写性能,多块物理磁盘能使读写具有更高的IO量; 好处2:实现故障转移,如果磁盘坏了,可以自动将数据转移到正常磁盘上,且Broker不会宕机。 如果这个参数没配,则使用 log.dir 中的参数配置,默认为/tmp/kafka-logs | null |
log.dir | 如果log.dirs 没配,则使用log.dir | null |
zookeeper.connect | 此参数必配。 格式为: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。例如我们经常配置的: hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka ,切记不要写成: hadoop102:2181/kafka,hadoop103:2181/kafka,hadoop104:2181/kafka | null |
监听器相关参数
参数名 | 备注 | 默认值 |
---|---|---|
listeners | 监听器,规定协议与访问主机名和端口,默认值PLAINTEXT://:9092 ,这里面需要配置的是一个三元组,格式为<协议名称://主机名:端口号> ,将hostName指定为0.0.0.0绑定到所有接口。将hostname留出为空绑定到默认接口。如果配置了自定义的协议名称,还需要配置listener.security.protocol.map 来告诉协议底层使用了何种安全协议。 | PLAINTEXT://:9092 |
advertised.listeners | 对外发布的监听器地址,为客户端使用,不同于listeners ,此参数允许端口重复,作负载均衡用。如果需要外网访问则会用到此参数,一般情况下,不需要设置! | null |
control.plane.listener.name | 配置监听器的名字,一般用不到,默认为null | null |
controller.listener.names | KRaft模式下有用,zk模式下用不到。 | null |
主题相关参数
参数名 | 备注 | 默认值 |
---|---|---|
auto.create.topics.enable | 测试环境中,这个参数通常开启,设置为true ,但是生产环境中一般关闭,设置为false。我们会发现一个现象,如果我们不创建主题,直接向名为“test”的主题发送消息,则kafka中会使用默认主题配置自动创建名为test的topic,但是这种情况一般在生产环境中不允许。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为false 。 | true |
delete.topic.enable | 是否允许删除主题,默认为true ,一般不修改。一般会通过权限管理来限制部分用户删除主题。个别权限控制严格的公司可能会修改为false,但这样就无法删除topic,生产中建议不修改! | true |
线程相关参数
参数名 | 备注 | 默认值 |
---|---|---|
background.threads | 默认为10,后台处理任务的线程数。如果不出问题,默认不修改。 | 10 |
num.io.threads | 服务器用于处理请求的线程数,其可能包括磁盘I/O | 8 |
num.network.threads | 服务器用于接收来自网络的请求并向网络发送响应的线程数 | 3 |
num.recovery.threads.per.data.dir | 在启动时用于日志恢复和在关闭时刷新的每个数据目录的线程数 | 1 |
num.replica.alter.log.dirs.threads | 可以在日志目录之间移动副本的线程数,这可能包括磁盘I/O,不算常驻线程,无需设置。 | null |
num.replica.fetchers | 用于复制来自源代理的消息线程的数量。增加该值可以增加follower broker中的I/O平行度的程度。 | 1 |
压缩相关参数
生产中,一般Kafka会配置压缩以减少磁盘占用。
参数名 | 备注 | 默认值 |
---|---|---|
compression.type | 配置消息压缩格式,有以下几种:'gzip', 'snappy', 'lz4', 'zstd' ,生产环境中需要按照实际是否需要压缩进行配置。还可以配置 'uncompressed' 意为不压缩,还可以配置'producer' 意为和producer端的压缩格式保持一致。 | producer |
附:Facebook Zstandard官网提供的压缩算法对比结果:
Compressor name | Ratio | Compression | Decompress |
---|---|---|---|
zstd 1.3.4-1 | 2.877 | 470 MB/s | 1380 MB/s |
zlib 1.2.11-1 | 2.743 | 110 MB/s | 400 MB/s |
brotli 1.0.2-0 | 2.701 | 410 MB/s | 430 MB/s |
quicklz 1.5.0-1 | 2.238 | 550 MB/s | 710 MB/s |
lzo1x 2.09-1 | 2.108 | 650 MB/s | 830 MB/s |
lz4 1.8.1 | 2.101 | 750 MB/s | 3700 MB/s |
snappy 1.1.4 | 2.091 | 530 MB/s | 1800 MB/s |
lzf 3.6-1 | 2.077 | 400 MB/s | 860 MB/s |
ZooKeeper相关参数
ZK相关的参数一般不做修改。默认即可。
参数名 | 备注 | 默认值 |
---|---|---|
controller.quorum.election.backoff.max.ms | 开始新的选举前的最长时间(毫秒),防止选举陷入僵局。默认1000ms,不修改。 | 1000 |
controller.quorum.election.timeout.ms | 没有leader后触发选举的等待市场,默认1000ms,不修改 | 1000 |
controller.quorum.fetch.timeout.ms | 在成为候选者和触发投票选举之前从当前leader处拉取数据失败等待的最大时长,默认2000ms。不修改。 | 2000 |
controller.quorum.voters | 参与投票的服务器列表,逗号分割。需要填入非空的集合 | “” |
zookeeper.connection.timeout.ms | 客户端等待与Zookeeper建立连接的最大时间。 如果没有设置,则使用 zookeeper.session.timeout.ms 中的值 | null |
zookeeper.max.in.flight.requests | 客户端在阻塞前发送给Zookeeper的未确认请求的最大数量。 | 10 |
zookeeper.session.timeout.ms | ZK的会话超时时间 | 18000 |
重平衡与选举相关参数
重平衡相关参数需要根据实际需求进行调整,原理类似于HDFS中的重平衡。
参数名 | 备注 | 默认值 |
---|---|---|
unclean.leader.election.enable | 是否将未设置在ISR中的副本作为最后的手段来选举为leader,即使这样做可能会导致数据丢失。 这个参数默认为false。意为关闭“不干净的”leader选举。 Kafka有多个副本,每个分区都有多个副本,但只有一个leader副本对外提供服务。不是所有的副本都有资格竞选leader,只有保存数据量多的副本才有资格竞选,即所谓的ISR中的才能竞选。未在ISR中的副本被称为unclean的。 如果此参数设置为true,则未在ISR中的副本也可参与竞选,这样就会有丢失数据的风险。 生产环境中建议设置此值为false! | false |
auto.leader.rebalance.enable | 请注意,此参数在生产环境中极易被忽视。 设置为true则表示允许Kafka定期进行对一些topic的leader的重新选举。即便是之前的leader没有任何问题,也有可能在满足选举条件之后换leader。 但是在生产中换leader的成本极高,且没有性能收益,所以在生产中建议设置为false! | true |
leader.imbalance.check.interval.seconds | 检查各个分区是否平衡的频率,默认300s,不修改 | 300 |
leader.imbalance.per.broker.percentage | 触发重平衡的阈值百分比,默认为10。 举例,如果一个broker上有10个分区,有两个分区的leader不是preferred leader,比例超过10%,则会触发重平衡。 | 10 |
日志刷写相关参数
在Linux系统中,当我们把数据写入文件系统之后,其实数据在操作系统的pagecache里面,并没有刷到磁盘上。如果操作系统挂了,数据就丢失了。一方面,应用程序可以调用fsync这个系统调用来强制刷盘,另一方面,操作系统有后台线程,定时刷盘。频繁调用fsync会影响性能,需要在性能和可靠性之间进行权衡。实际上,官方不建议通过上述的三个参数来强制写盘,认为数据的可靠性通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响。
Kafka的持久性并非要求同步数据到磁盘,因为问题节点都是从副本中恢复数据。这样刷盘依赖操作系统及Kafka的后台刷盘机制。这样的好处是:无需调优、高吞吐量、低延时和可全量恢复。
操作系统一般默认30s刷盘一次。
参数名 | 备注 | 默认值 |
---|---|---|
log.flush.interval.messages | 刷写数据到磁盘的消息条数间隔,默认值为Long的最大值9223372036854775807,不建议修改! | 9223372036854775807 |
log.flush.interval.ms | 消息在刷新到磁盘之前保存在内存中的最大时间(以ms为单位)。如果没有设置,则使用log.flush.scheduler.interval.ms 中的值,默认null | null |
log.flush.scheduler.interval.ms | 日志刷新器检查是否有日志需要刷新到磁盘的频率(以ms为单位),默认值Long的最大值9223372036854775807,不建议修改! | 9223372036854775807 |
log.flush.offset.checkpoint.interval.ms | 更新作为日志恢复点的上次刷新的持久记录的频率,默认60000ms(1min)。 | 60000 |
log.flush.start.offset.checkpoint.interval.ms | 更新日志开始偏移量的持久记录的频率,默认60000ms(1min)。 | 60000 |
日志保留相关参数
日志保留相关参数需要根据具体的生产实际及磁盘容量与数据量进行调整。
参数名 | 备注 | 默认值 |
---|---|---|
log.retention.bytes | 删除日志前的日志大小,此参数一般不用,默认值为-1。表示在当前Broker上保存的数据容量不受限制。 如果你所在的公司是一个对外提供云服务的公司,需要做租户管理,那么这个参数就有设置的必要性,可以为单租户设置Kafka保存消息的容量上限。 | -1 |
log.retention.hours | 日志保留的时长,默认168小时,7天 | 168 |
log.retention.minutes | 日志保留的分长,默认null | null |
log.retention.ms | 日志保留的毫秒值。默认null,如果设置为-1,那么日志无限期保留。 以上三个参数的优先级为: ms > minutes > hours ,如果没有配ms,找minutes,如果没有minutes找hours | null |
日志滚动切片相关参数
日志滚动与切片参数建议根据生产实际进行调整。
参数名 | 备注 | 默认值 |
---|---|---|
log.roll.hours | 优先级小于log.roll.ms ,日志滚动的最大时长,默认值168hours,也就是7days | 168 |
log.roll.ms | 日志滚动的最大时长,默认值null,这个值未设置会去找log.roll.hours | null |
log.roll.jitter.hours | 给日志段的切分加一个扰动值,避免大量日志段在同一时间进行切分操作。 如果发现kafka有周期性的磁盘I/O打满情况,建议设置此值。 | 0 |
log.roll.jitter.ms | 同上一个参数,优先级高于log.roll.jitter.hours | null |
log.segment.bytes | 单个log文件的最大大小,默认1073741824,也就是1G | 1073741824 |
log.segment.delete.delay.ms | 从文件系统中删除文件之前等待的时间 | 60000 (1 minute) |
元数据相关参数
元数据相关参数一般不做调整。
参数名 | 备注 | 默认值 |
---|---|---|
metadata.log.dir | 这个配置决定了我们在KRaft模式下将集群的元数据日志放在哪里。 如果没有设置,元数据日志将放在log.dirs的第一个日志目录中。如果不是KRaft模式,元数据在ZK中 | null |
metadata.log.max.record.bytes.between.snapshots | 这是在生成新快照之前,日志中从最新快照到高水位之间的最大字节数。默认20MB | 20971520 |
metadata.log.segment.bytes | 单个元数据日志文件的最大大小。默认1GB | 1073741824 |
metadata.log.segment.ms | 新元数据日志文件滚出之前的最大时间(以毫秒为单位)。默认7天。 | 604800000 |
metadata.max.retention.bytes | 删除旧快照和日志文件前元数据日志和快照的最大组合大小。因为在删除任何日志之前必须至少存在一个快照,所以这是一个软限制。 | -1 |
metadata.max.retention.ms | 在删除元数据日志文件或快照之前保存该文件或快照的毫秒数。因为在删除任何日志之前必须至少存在一个快照,所以这是一个软限制。默认7天。 | 604800000 |
副本相关参数
副本相关参数一般不做调整。
参数名 | 备注 | 默认值 |
---|---|---|
min.insync.replicas | 当生产者将ack设置为“all”(或“-1”)时,min.insync.replicas 指定必须确认写入成功的最小副本数量。如果无法满足此最小值,则生产者将引发异常(NotEnoughReplicas 或NotEnoughReplicasApend )此参数和ack设置配合使用可以允许您强制执行更大的耐用性保证。典型的方案是创建一个主题,复制因子为3,设置 min.insync.replicas 为2,并使用ack级别为all,如果大多数副本没有收到写入,这将确保生产者提出异常。 | 1 |
replica.fetch.min.bytes | 每次读取响应所需的最小字节数。如果没有足够的字节,则等待下面的replica.fetch.wait.max.ms 参数 | 1 |
replica.fetch.wait.max.ms | follower副本发出的每个获取器请求的最大等待时间。该值在任何时候都应该小于replica.lag.time.max.ms ,以防止低吞吐量主题的ISR频繁收缩 | 500 |
replica.high.watermark.checkpoint.interval.ms | 将高水位保存到磁盘的频率 | 5000 |
replica.lag.time.max.ms | 如果follower没有发送任何fetch请求,或者没有消耗leader日志结束偏移量,leader将从ISR中删除follower | 30000 |
replica.socket.receive.buffer.bytes | 用于网络请求的套接字接收缓冲区,64K | 65536 |
replica.socket.timeout.ms | 网络请求的套接字超时。它的值至少应该是replica.fetch.wait.max.ms | 30000 |
offset相关参数
offset内部主题相关参数,一般保持默认即可。
参数名 | 备注 | 默认值 |
---|---|---|
offset.metadata.max.bytes | 配置offset请求的最大请求消息大小,默认4K | 4096 |
offsets.commit.required.acks | 配置提交offset请求的ack值,默认-1 | -1 |
offsets.commit.timeout.ms | 配置提交offset的最长等待时间,5s | 5000 |
offsets.load.buffer.size | 用于读取offset信息到内存cache时候,读取缓冲区的大小,默认5M。 | 5242880 |
offsets.retention.check.interval.ms | 定期检查offset过期数据的周期,默认600秒。 | 600000 |
offsets.retention.minutes | 针对一个offset的消费记录的最长保留时间,默认为10080分钟,即7天。 | 10080 |
offsets.topic.compression.codec | offset主题的压缩解码器,可以用来实现原子提交 | 0 |
offsets.topic.num.partitions | offset主题的默认分区数,默认50个,这与我们看到的默认offset主题中的分区数一致。 | 50 |
offsets.topic.replication.factor | offset主题的副本因子,默认3个。低于这个数值,offset主题将创建失败。 | 3 |
offsets.topic.segment.bytes | offset字节应该保持相对较小,以促进更快的日志压缩和缓存负载,默认100M | 104857600 |
消息相关参数
参数名 | 备注 | 默认值 |
---|---|---|
message.max.bytes | Kafka允许的最大消息批次大小,如果启用了压缩,那么计算压缩之后的大小。 在老版本的Kafka中,如果消息未压缩,则不会进行批处理,则此值设置的为单条消息的大小上限。 最新官方文档中默认数值为1048588,刚刚比1M多出来12Bytes。 需要注意,如果在实际生产中,你的单条消息超过了1M,则必须增大此值。 一般地,在生产环境中,尤其是有接收较大单条数据的场景中,为了防止该值过小造成接收数据失败,均需将此值调大。 | 1048588 |
Topic级别配置详解
topic级别的参数,一般都在broker中对应有默认配置,但是也可以对单独的topic进行设置,可以在topic创建之初使用--config
来进行指定,也可以在创建完成之后再进行修改。
以下是比较重要的topic级别的参数配置。
日志清理压缩相关参数
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
cleanup.policy | 指定过期日志使用的保留策略,默认为delete将删除过期日志段,设置为compact将会进行压缩。可选值为delete或者compact | delete | log.cleanup.policy |
compression.type | 参考broker设置中的压缩,可选值为[uncompressed, zstd, lz4, snappy, gzip, producer] | producer | compression.type |
delete.retention.ms | 配此配置专门针对tombstone类型的消息进行设置,默认为86400000,也就是1天,这个tombstone在当次compact完成后并不会被清理,在下次compact时候,最后的修改时间加上此配置时间值大于当前时间才会被删除。 | 86400000 | log.cleaner.delete.retention.ms |
file.delete.delay.ms | 从文件系统中删除文件前的等待时间 | 60000 | log.segment.delete.delay.ms |
min.cleanable.dirty.ratio | 默认值为0.5,此配置项控制日志压缩的比率,比率越高,则压缩的日志越少。 | 0.5 | log.cleaner.min.cleanable.ratio |
min.compaction.lag.ms | 此参数配合上面的min.cleanable.dirty.ratio 使用。配置该值后,如果上述压缩比率满足,且日志在该值持续时间内有未压缩记录,则日志符合压缩条件。 | 0 | log.cleaner.min.compaction.lag.ms |
max.compaction.lag.ms | 消息在日志中不符合压缩条件的最长时间。仅适用于正在压缩的日志。配置该值后,如果日志在该值持续时间内有未压缩记录,则日志符合压缩条件。 | 9223372036854775807 | log.cleaner.max.compaction.lag.ms |
日志刷写相关参数
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
flush.messages | 强制刷写的消息条数,建议不要设置此值!! 相关说明参考broker端配置。 | 9223372036854775807 | log.flush.interval.messages |
flush.ms | 强制刷写的时间间隔,建议不要设置此值!! 相关说明参考broker端配置。 | 9223372036854775807 | log.flush.interval.ms |
索引相关参数
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
index.interval.bytes | 索引的间隔,我们知道Kafka为稀疏索引,默认值为4096,如果设置较小的值,则索引间隔变短,索引的容量会变大,但能更快命中。建议不修改,默认即可。 | 4096 | log.index.interval.bytes |
消息、副本与选举相关参数
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
max.message.bytes | 参考broker端的message.max.bytes 配置项,设置单条消息的大小上限。建议在生产中调大! | 1048588 | message.max.bytes |
message.timestamp.type | 定义消息中的时间戳取创建时间还是日志追加时间,可选值为[CreateTime, LogAppendTime] ,默认为CreateTime ,无需修改。 | CreateTime | log.message.timestamp.type |
min.insync.replicas | 具体解释参考broker端配置项min.insync.replicas | 1 | min.insync.replicas |
unclean.leader.election.enable | 具体解释参考broker端配置项unclean.leader.election.enable | false | unclean.leader.election.enable |
日志保留与滚动相关参数
参数名 | 备注 | 默认值 | 未指定时参考配置 |
---|---|---|---|
retention.bytes | 具体解释参考broker端配置项log.retention.bytes | -1 | log.retention.bytes |
retention.ms | 具体解释参考broker端配置项log.retention.ms | 604800000 | log.retention.ms |
segment.bytes | 具体解释参考broker端配置项log.segment.bytes | 1073741824 | log.segment.bytes |
segment.index.bytes | 具体解释参考broker端配置项log.index.size.max.bytes | 10485760 | log.index.size.max.bytes |
segment.jitter.ms | 具体解释参考broker端配置项log.roll.jitter.ms | 0 | log.roll.jitter.ms |
segment.ms | 具体解释参考broker端配置项log.roll.ms | 604800000 | log.roll.ms |
Producer端配置详解
Producer负责向服务器发送数据,在实际生产中,更多的是使用API作为Producer端进行数据发送。
以下是Producer端中比较重要的配置参数。
序列化、分区器、拦截器相关参数
参数名 | 备注 | 默认值 |
---|---|---|
key.serializer | 需填写key的序列化器,实现org.apache.kafka.common.serialization.Serializer 接口 | null |
value.serializer | 需填写value的序列化器,实现org.apache.kafka.common.serialization.Serializer 接口 | null |
partitioner.class | 分区器的全类名。可以通过实现org.apache.kafka.clients.producer.Partitioner 接口自定义分区器。 | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
interceptor.classes | 拦截器全类名,可以通过实现org.apache.kafka.clients.producer.ProducerInterceptor 接口自定义拦截器 | null |
集群地址与压缩相关参数
参数名 | 备注 | 默认值 |
---|---|---|
bootstrap.servers | 须填写Kafka集群的地址与端口,如果包含多个broker,可以只填写其中一个,但是为预防填写单个broker产生宕机,建议填写全部可连接broker地址 | null |
compression.type | 压缩格式,默认为none。因为broker端压缩配置默认为producer,意为与producer保持一致。可选值为[none, zstd, lz4, snappy, gzip] 。设置压缩可以提高消息发送效率。 | none |
批处理相关参数
参数名 | 备注 | 默认值 |
---|---|---|
buffer.memory | 生产者用于将消息发送至服务器的总缓冲区大小,如果消息发送的速度超过了传送到服务器的速度,生产者将会阻塞max.block.ms 时间,之后抛出异常。默认值为33554432,也就是32M。 生产实际中 ,如果单条消息过大,需要调整该参数。 | 33554432 |
batch.size | 此参数十分重要! 默认16K。 如果消息发往同一个分区,producer端将会把消息以批处理的形式发送,一个批次的大小由该参数控制! 如果消息大小大于该值,则消息不会以批处理形式发送。 该值不宜设置过大,太大的批会浪费内存空间。 如果一批数据未达到该值,则该批数据会等待 linger.ms 时间后发送。为了提升处理效率,Kafka将从缓冲区中预先分配 batch.size 大小的内存空间。生产中一般会根据实际消息大小,将 buffer.memory 和batch.size 配合调整。 | 16384 |
linger.ms | 一批消息发送之前的等待时长,默认为0。合理设置此时间将有助于有效填满批次,提高效率。但是如果每次一批数据都能攒满发送,设置此值将会造成整体消息延迟。生产中需要根据具体需求设置。 | 0 |
max.request.size | 单个请求的最大值。如果单个消息过大,需要调整该值。 | 1048576 |
TCP缓冲相关参数
参数名 | 备注 | 默认值 |
---|---|---|
receive.buffer.bytes | TCP接收缓冲大小,默认不修改 | 32768 |
send.buffer.bytes | TCP发送缓冲大小,默认不修改 | 131072 |
消息有序性相关参数
参数名 | 备注 | 默认值 |
---|---|---|
retries | 此值大于0,客户端将会重新发送失败的消息。如果max.in.flight.requests.per.connection 的值不为1,则重试有可能会改变消息的有序性。一般这个值默认即可,不需要调整,重试由delivery.timeout.ms 参数控制。 | 2147483647 |
acks | 重要参数!ack级别决定了一个请求完成之前,生产者要求leader返回的确认数。 可选值有 [all, -1, 0, 1] 。acks为0表示生产者不会等待服务器确认,这样不能保证服务器已经收到消息。 acks为1表示leader已经确认,但是未等待follower确认。 ack为all或者-1需要等待ISR中的所有副本确认,保证数据不丢失。 | all |
enable.idempotence | 默认为true。生产者将确保每个消息精准写入一次。如果设置为false,可能因为broker端失败而重复写入消息。如果要启用幂等,需要满足:max.in.flight.requests.per.connection 小于等于5,retries 大于0,acks 为all。 | true |
max.in.flight.requests.per.connection | 客户端在单个连接上发送的未确认请求的最大数量。如果此值大于1,且enable.idempotence 为false,则消息有可能因重试而产生乱序。 | 5 |
事务相关参数
参数名 | 备注 | 默认值 |
---|---|---|
transactional.id | 如果需要启用事务,需要分配该值。默认为null | null |
Consumer端配置详解
Consumer对数据进行消费,一般通过流处理框架对Kafka中的数据进行消费处理,因此,Consumer端是否消费正常对于数据处理显得尤为重要。
以下是Consumer端的重要参数。
必配参数
参数名 | 备注 | 默认值 |
---|---|---|
bootstrap.servers | 须填写Kafka集群的地址与端口,如果包含多个broker,可以只填写其中一个,但是为预防填写单个broker产生宕机,建议填写全部可连接broker地址 | null |
group.id | 规定消费者属于哪个消费者组。如果需要使用Topic,Partition和Group来限定消费offset,则此值必须设置! | null |
auto.offset.reset | Kafka中没有初始偏移量的时候,或者当前偏移量在服务器上没有时候的消费策略。 可选值: [latest, earliest, none] 。1.earliest会重置到最早偏移量。 2.latest重置为最近偏移量。 3.设置为none时候,如果消费者所在的组没有找到之前的偏移量,则抛出异常。 | latest |
反序列化、拦截器相关参数
参数名 | 备注 | 默认值 |
---|---|---|
key.deserializer | key的反序列化器,需要实现org.apache.kafka.common.serialization.Deserializer 接口 | null |
value.deserializer | value的反序列化器,需要实现org.apache.kafka.common.serialization.Deserializer 接口 | null |
interceptor.classes | 拦截器的类。可通过实现org.apache.kafka.clients.consumer.ConsumerInterceptor 进行自定义。 | null |
自动提交offset参数
参数名 | 备注 | 默认值 |
---|---|---|
enable.auto.commit | 设置是否周期性自动提交offset。如果在代码中手动维护偏移量,则需要将此值设置为false。 | true |
auto.commit.interval.ms | 如果设置了自动提交enable.auto.commit 为true,那么该参数生效,提交间隔默认为5秒。 | 5000 |
拉取数据相关参数
参数名 | 备注 | 默认值 |
---|---|---|
fetch.min.bytes | 服务器在获取请求时返回的最小数据量,默认为1,意味着只要有一个字节的数据可用,就会立即相应请求,调大该值将会让服务器等待接受更多数据,但是会略微增加延迟。 | 1 |
fetch.max.bytes | 服务器在获取请求时返回的最大数据量。默认50M。 如果Kafka发送的每条消息都比这个数值要大,那只会返回非空分区中的第一条数据。 Kafka中所能接收的最大消息大小受两个参数控制: 1.broker端参数 message.max.bytes 2.topic级别参数 max.message.bytes | 52428800 |
fetch.max.wait.ms | 如果没有足够的数据满足fetch.min.bytes ,服务器将等待的最大时长。 | 500 |
max.partition.fetch.bytes | 默认1M。 服务器将返回每个分区的最大数据量。 | 1048576 |
分区分配策略参数
参数名 | 备注 | 默认值 |
---|---|---|
partition.assignment.strategy | 消费者的分区分配策略。有以下几种:org.apache.kafka.clients.consumer.RangeAssignor ,org.apache.kafka.clients.consumer.RoundRobinAssignor ,org.apache.kafka.clients.consumer.StickyAssignor ,org.apache.kafka.clients.consumer.CooperativeStickyAssignor 。第一种按照消费者总数和分区总数进行均分, 第二种为轮循分配, 第三种为黏性分区, 第四种与第三种类似,但是多了再平衡。 默认为第一种。 也可以自定义,只需要实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口 | class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
receive.buffer.bytes | TCP接收缓冲大小,默认不修改 | 65536 |
TCP缓冲及其他参数
参数名 | 备注 | 默认值 |
---|---|---|
receive.buffer.bytes | TCP接收缓冲大小,默认不修改 | 65536 |
send.buffer.bytes | TCP发送缓冲大小,默认不修改 | 131072 |
exclude.internal.topics | 是否排除内部主题。 我们都知道Kafka有一个内部主题为 __consumer_offsets ,如果使用正则方式订阅主题且匹配到了内部主题,则该参数控制是否将内部主题暴露给消费者。如果为false,则只能通过订阅的方式来消费内部topic。 | true |
allow.auto.create.topics | 允许自动创建topic。需要broker端auto.create.topics.enable 参数为true才能使用。此参数默认即可。 | true |
如有建议和意见,欢迎在留言区评论!如果觉得本文对你有帮助,欢迎分享转发!
Kafka生产优化系列回顾: