斗转星移 | 三万字总结Kafka各个版本差异

共 32392字,需浏览 65分钟

 ·

2020-09-07 19:58

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!

暴走大数据
点击右侧关注,暴走大数据!



Kafka 2.0.0引入了线程协议的变化。通过遵循下面建议的滚动升级计划,您可以保证在升级期间不会出现停机。但是,请在升级之前查看2.0.0中重大更改

对于滚动升级:

  1. 更新所有代理上的server.properties并添加以下属性。CURRENT_KAFKA_VERSION指的是您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION指的是当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果要从0.11.0.x之前的版本升级,则应将CURRENT_MESSAGE_FORMAT_VERSION设置为与CURRENT_KAFKA_VERSION匹配。如果要从0.11.0.x,1.0.x或1.1.x升级并且尚未覆盖消息格式,则只需覆盖代理间协议格式。

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(0.11.0,1.0,1.1)。

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0,0.10.1,0.10.2,0.11.0,1.0,1.1)。

    • log.message.format.version = CURRENT_MESSAGE_FORMAT_VERSION

  2. 一次升级一个代理:关闭代理,更新代码并重新启动它。

  3. 升级整个群集后,通过编辑inter.broker.protocol.version并将其设置为2.0来破坏协议版本。

  4. 逐个重新启动代理以使新协议版本生效。

  5. 如果您已按照上面的说明覆盖了消息格式版本,则需要再执行一次滚动重新启动以将其升级到其最新版本。将所有(或大多数)使用者升级到0.11.0或更高版本后,在每个代理上将log.message.format.version更改为2.0并逐个重新启动它们。请注意,旧的Scala使用者不支持0.11中引入的新消息格式,因此为了避免下转换的性能成本(或者只利用一次语义),必须使用较新的Java使用者。

其他升级说明:

  1. 如果您愿意接受停机时间,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,它们将以新协议开始。

  2. 在升级代理后,可以随时进行协议版本的碰撞并重新启动。它不一定要立即。同样适用于消息格式版本。

  3. 如果您在Kafka Streams代码中使用Java8方法引用,则可能需要更新代码以解决方法歧义。仅交换jar文件可能不起作用。

  4. 不应将ACL添加到前缀资源(在KIP-290中添加),直到集群中的所有代理都已更新。

    注意:如果群集再次降级,则即使在群集完全升级后,也会忽略添加到群集的任何带前缀的ACL。

2.0.0中的显着变化

  • KIP-186将默认偏移保留时间从1天增加到7天。这使得在不经常提交的应用程序中“丢失”偏移的可能性降低。它还会增加活动的偏移量,因此可以增加代理上的内存使用量。请注意,控制台使用者当前默认启用偏移提交,并且可以是大量偏移的来源,此更改现在将保留7天而不是1.您可以通过将代理配置设置offsets.retention.minutes为1440 来保留现有行为。

  • 已经删除了对Java 7的支持,Java 8现在是所需的最低版本。

  • 默认值ssl.endpoint.identification.algorithm已更改为https,执行主机名验证(否则可能是中间人攻击)。设置ssl.endpoint.identification.algorithm为空字符串以恢复以前的行为。

  • KAFKA-5674将较低的间隔扩展max.connections.per.ip minimum为零,因此允许对入站连接进行基于IP的过滤。

  • KIP-272 为指标添加了API版本标记kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}。此指标现在变为kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}。这将影响不自动聚合的JMX监视工具。要获取特定请求类型的总计数,需要更新该工具以跨不同版本进行聚合。

  • KIP-225将度量标准“records.lag”更改为使用主题和分区标记。名称格式为“{topic} - {partition} .records-lag”的原始版本已被删除。

  • 自0.11.0.0以来已弃用的Scala使用者已被删除。自0.10.0.0以来,Java使用者一直是推荐的选择。请注意,即使经纪商升级到2.0.0,1.1.0(及更早版本)中的Scala使用者也将继续工作。

  • 自0.10.0.0以来已弃用的Scala生成器已被删除。自0.9.0.0以来,Java生产者一直是推荐的选择。请注意,Java生成器中的默认分区程序的行为与Scala生成器中的默认分区程序不同。迁移用户应考虑配置保留先前行为的自定义分区程序。请注意,即使代理升级到2.0.0,1.1.0(及更早版本)中的Scala生成器也将继续工作。

  • MirrorMaker和ConsoleConsumer不再支持Scala使用者,他们总是使用Java使用者。

  • ConsoleProducer不再支持Scala生成器,它总是使用Java生成器。

  • 已删除了许多依赖于Scala客户端的已弃用工具:ReplayLogProducer,SimpleConsumerPerformance,SimpleConsumerShell,ExportZkOffsets,ImportZkOffsets,UpdateOffsetsInZK,VerifyConsumerRebalance。

  • 已弃用的kafka.tools.ProducerPerformance已被删除,请使用org.apache.kafka.tools.ProducerPerformance。

  • upgrade.from添加了新的Kafka Streams配置参数,允许从旧版本滚动退回升级。

  • KIP-284通过将其默认值设置为更改了Kafka Streams重新分区主题的保留时间Long.MAX_VALUE

  • 更新ProcessorStateManager了Kafka Streams中的API,用于将状态存储注册到处理器拓扑。有关更多详细信息,请阅读Streams 升级指南

  • 在早期版本中,Connect的worker配置需要internal.key.converterinternal.value.converter属性。在2.0中,不再需要这些,并且默认为JSON转换器。您可以安全地从Connect独立和分布式工作器配置中删除这些属性:
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=falseinternal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter.schemas.enable=false

  • KIP-266添加了一个新的使用者配置,default.api.timeout.ms 以指定用于KafkaConsumer可能阻止的API 的默认超时。KIP还为这样的阻塞API添加了重载,以支持指定每个阻塞API使用的特定超时,而不是使用默认超时设置default.api.timeout.ms。特别是,poll(Duration)添加了一个新的API,它不会阻止动态分区分配。旧poll(long)API已弃用,将在以后的版本中删除。重载还添加了其他KafkaConsumer的方法,如partitionsForlistTopicsoffsetsForTimes, beginningOffsetsendOffsetsclose表示诚挚的一个Duration

  • 同时作为KIP-266的一部分,默认值request.timeout.ms已更改为30秒。之前的值略高于5分钟,以说明重新平衡所需的最长时间。现在我们将重新平衡中的JoinGroup请求视为一种特殊情况,并使用从max.poll.interval.ms请求超时派生的值 。所有其他请求类型都使用由定义的超时request.timeout.ms

  • 内部方法kafka.admin.AdminClient.deleteRecordsBefore已被删除。鼓励用户迁移到org.apache.kafka.clients.admin.AdminClient.deleteRecords

  • AclCommand工具--producer便捷选项在给定主题上使用KIP-277更细粒度的ACL。

  • KIP-176删除了--new-consumer所有基于消费者的工具的选项。此选项是多余的,因为如果定义了--bootstrap-server,则会自动使用新的使用者。

  • KIP-290增加了在前缀资源上定义ACL的功能,例如以'foo'开头的任何主题。

  • KIP-283改进了Kafka代理上的消息下转换处理,这通常是一个内存密集型操作。KIP添加了一种机制,通过该机制,通过一次下转换分区数据块来减少内存消耗,这有助于在内存消耗上设置上限。通过这种改进,FetchResponse协议行为发生了变化, 其中代理可以在响应结束时发送超大的消息批,并使用无效的偏移量。消费者客户必须忽略这种超大消息,就像这样做KafkaConsumer

    KIP-283还添加了新的主题和代理配置,message.downconversion.enablelog.message.downconversion.enable分别控制是否启用了下转换。禁用时,代理不会执行任何向下转换,而是向UNSUPPORTED_VERSION 客户端发送错误。

  • 在启动代理之前,可以使用kafka-configs.sh将动态代理配置选项存储在ZooKeeper中。此选项可用于避免在server.properties中存储明确的密码,因为所有密码配置都可以加密存储在ZooKeeper中。

  • 如果连接尝试失败,ZooKeeper主机现在会重新解析。但是,如果您的ZooKeeper主机名解析为多个地址,而其中一些地址无法访问,则可能需要增加连接超时zookeeper.connection.timeout.ms

新协议版本

  • KIP-279:OffsetsForLeaderEpochResponse v1引入了分区级leader_epoch字段。

  • KIP-219:将非群集操作请求和响应的协议版本更改为限制配额违规。

  • KIP-290:Bump up协议版本ACL创建,描述和删除请求和响应。

升级1.1 Kafka Streams应用程序

  • 将Streams应用程序从1.1升级到2.0不需要代理升级。Kafka Streams 2.0应用程序可以连接到2.0,1.1,1.0,0.11.0,0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。

  • 请注意,在2.0中,我们删除了在1.0之前弃用的公共API; 利用这些已弃用的API的用户需要相应地更改代码。有关更多详细信息,请参阅2.0.0中的Streams API更改

从0.8.x,0.9.x,0.10.0.x,0.10.1.x,0.10.2.x,0.11.0.x或1.0.x升级到1.1.x

Kafka 1.1.0引入了线程协议的变化。通过遵循下面建议的滚动升级计划,您可以保证在升级期间不会出现停机。但是,请在升级之前查看1.1.0中重大更改

对于滚动升级:

  1. 更新所有代理上的server.properties并添加以下属性。CURRENT_KAFKA_VERSION指的是您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION指的是当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果要从0.11.0.x之前的版本升级,则应将CURRENT_MESSAGE_FORMAT_VERSION设置为与CURRENT_KAFKA_VERSION匹配。如果要从0.11.0.x或1.0.x升级并且未覆盖消息格式,则只需覆盖代理间协议格式。

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(0.11.0或1.0)。

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0,0.10.1,0.10.2,0.11.0,1.0)。

    • log.message.format.version = CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

  2. 一次升级一个代理:关闭代理,更新代码并重新启动它。

  3. 升级整个群集后,通过编辑inter.broker.protocol.version并将其设置为1.1来破坏协议版本。

  4. 逐个重新启动代理以使新协议版本生效。

  5. 如果您已按照上面的说明覆盖了消息格式版本,则需要再执行一次滚动重新启动以将其升级到其最新版本。将所有(或大多数)使用者升级到0.11.0或更高版本后,在每个代理上将log.message.format.version更改为1.1并逐个重新启动它们。请注意,旧的Scala使用者不支持0.11中引入的新消息格式,因此为了避免下转换的性能成本(或者只利用一次语义),必须使用较新的Java使用者。

其他升级说明:

  1. 如果您愿意接受停机时间,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,它们将以新协议开始。

  2. 在升级代理后,可以随时进行协议版本的碰撞并重新启动。它不一定要立即。同样适用于消息格式版本。

  3. 如果您在Kafka Streams代码中使用Java8方法引用,则可能需要更新代码以解决方法限制。仅交换jar文件可能不起作用。

1.1.1中的显着变化

  • upgrade.from添加了新的Kafka Streams配置参数,允许从版本0.10.0.x滚动退回升级

  • 有关此新配置的详细信息,请参阅Kafka Streams升级指南

1.1.0中的显着变化

  • Maven中的kafka工件不再依赖于log4j或slf4j-log4j12。与kafka-clients工件类似,用户现在可以通过包含适当的slf4j模块(slf4j-log4j12,logback等)来选择日志记录后端。发行版tarball仍包含log4j和slf4j-log4j12。

  • KIP-225将度量标准“records.lag”更改为使用主题和分区标记。名称格式为“{topic} - {partition} .records-lag”的原始版本已弃用,将在2.0.0中删除。

  • Kafka Streams更能抵御代理通信错误。Kafka Streams尝试自我修复并重新连接到群集,而不是停止Kafka Streams客户端的致命异常。使用新的,AdminClient您可以更好地控制Kafka Streams重试的频率,并可以配置 细粒度的超时(而不是旧版本中的硬编码重试)。

  • Kafka Streams重新平衡时间进一步减少,使Kafka Streams更具响应性。

  • Kafka Connect现在支持接收器和源接口中的消息头,并通过简单的消息转换来操作它们。必须更改连接器以明确使用它们。HeaderConverter引入了一个new 来控制头(de)序列化的方式,默认情况下使用新的“SimpleHeaderConverter”来使用值的字符串表示。

  • kafka.tools.DumpLogSegments现在自动设置深度迭代选项,如果由于解码器等任何其他选项而显式或隐式启用了print-data-log。

新协议版本

  • KIP-226引入了DescribeConfigs请求/响应v1。

  • KIP-227引入了获取请求/响应v7。

升级1.0 Kafka Streams应用程序

  • 将Streams应用程序从1.0升级到1.1不需要代理升级。Kafka Streams 1.1应用程序可以连接到1.0,0.11.0,0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。

  • 有关更多详细信息,请参阅1.1.0中的Streams API更改

从0.8.x,0.9.x,0.10.0.x,0.10.1.x,0.10.2.x或0.11.0.x升级到1.0.0

Kafka 1.0.0引入了线程协议的变化。通过遵循下面建议的滚动升级计划,您可以保证在升级期间不会出现停机。但是,请在升级之前查看1.0.0中显着更改

对于滚动升级:

  1. 更新所有代理上的server.properties并添加以下属性。CURRENT_KAFKA_VERSION指的是您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION指的是当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果要从0.11.0.x之前的版本升级,则应将CURRENT_MESSAGE_FORMAT_VERSION设置为与CURRENT_KAFKA_VERSION匹配。如果要从0.11.0.x升级并且未覆盖消息格式,则必须将消息格式版本和代理间协议版本都设置为0.11.0。

    • inter.broker.protocol.version = 0.11.0

    • log.message.format.version = 0.11.0

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0,0.10.1,0.10.2,0.11.0)。

    • log.message.format.version = CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

  2. 一次升级一个代理:关闭代理,更新代码并重新启动它。

  3. 升级整个群集后,通过编辑inter.broker.protocol.version并将其设置为1.0来破坏协议版本。

  4. 逐个重新启动代理以使新协议版本生效。

  5. 如果您已按照上面的说明覆盖了消息格式版本,则需要再执行一次滚动重新启动以将其升级到其最新版本。将所有(或大多数)使用者升级到0.11.0或更高版本后,在每个代理上将log.message.format.version更改为1.0并逐个重新启动它们。如果要从0.11.0升级并将log.message.format.version设置为0.11.0,则可以更新配置并跳过滚动重新启动。请注意,旧的Scala使用者不支持0.11中引入的新消息格式,因此为了避免下转换的性能成本(或者只利用一次语义),必须使用较新的Java使用者。

其他升级说明:

  1. 如果您愿意接受停机时间,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,它们将以新协议开始。

  2. 在升级代理后,可以随时进行协议版本的碰撞并重新启动。它不一定要立即。同样适用于消息格式版本。

1.0.2中的显着变化

  • upgrade.from添加了新的Kafka Streams配置参数,允许从版本0.10.0.x滚动退回升级

  • 有关此新配置的详细信息,请参阅Kafka Streams升级指南

1.0.1中的显着变化

  • 使用0.11.0.x恢复AdminClient的Options类(例如CreateTopicsOptions,DeleteTopicsOptions等)的二进制兼容性。在1.0.0中无意中破坏了二进制(但不是源)兼容性。

1.0.0中的显着变化

  • 现在默认启用主题删除,因为功能现在已稳定。希望保留先前行为的用户应将代理配置设置delete.topic.enablefalse。请记住,删除主题会删除数据并且操作不可逆(即没有“取消删除”操作)

  • 对于支持时间戳搜索的主题,如果找不到分区的偏移量,则该分区现在包含在具有空偏移值的搜索结果中。以前,分区未包含在地图中。进行此更改是为了使搜索行为与不支持时间戳搜索的主题的情况一致。

  • 如果inter.broker.protocol.version是1.0或更高版本,即使存在脱机日志目录,代理现在也将保持联机以在活动日志目录上提供副本。由于硬件故障导致IOException,日志目录可能会脱机。用户需要监视每个代理度量标准offlineLogDirectoryCount以检查是否存在脱机日志目录。

  • 添加了KafkaStorageException,这是一个可重试的例外。如果客户端的FetchRequest或ProducerRequest版本不支持KafkaStorageException,则KafkaStorageException将在响应中转换为NotLeaderForPartitionException。

  • -XX:+ DisableExplicitGC在默认JVM设置中被-XX:+ ExplicitGCInvokesConcurrent替换。在某些情况下,这有助于避免在通过直接缓冲区分配本机内存期间出现内存不足异常。

  • 重写的handleError方法实现已经从以下过时类中除去kafka.api:包FetchRequestGroupCoordinatorRequestOffsetCommitRequestOffsetFetchRequestOffsetRequestProducerRequest,和TopicMetadataRequest。这仅用于代理,但它已不再使用,并且尚未维护实现。已保留存根实现以实现二进制兼容性。

  • Java客户端和工具现在接受任何字符串作为客户端ID。

  • 已弃用的工具kafka-consumer-offset-checker.sh已被删除。使用kafka-consumer-groups.sh得到消费群的详细信息。

  • SimpleAclAuthorizer现在默认将访问拒绝记录到授权者日志。

  • 现在,身份验证失败作为其中一个子类报告给客户端AuthenticationException。如果客户端连接验证失败,则不会执行重试。

  • 自定义SaslServer实现可能会抛出SaslAuthenticationException提供错误消息以返回到客户端,指示身份验证失败的原因。实现者应注意不要在异常消息中包含任何不应泄漏给未经身份验证的客户端的安全关键信息。

  • app-infoJMX中注册以提供版本和提交ID 的mbean将被弃用,并替换为提供这些属性的度量标准。

  • Kafka指标现在可能包含非数字值。org.apache.kafka.common.Metric#value()已被弃用,并将0.0在这种情况下返回,以最大限度地减少破坏读取每个客户端度量值的用户的可能性(通过MetricsReporter实现或通过调用metrics()方法)。 org.apache.kafka.common.Metric#metricValue()可用于检索数字和非数字度量标准值。

  • 现在,每个Kafka费率指标都有一个相应的累积计数指标,后缀-total 用于简化下游处理。例如,records-consumed-rate具有相应的度量标准records-consumed-total

  • 仅当系统属性kafka_mx4jenable设置为时,才会启用Mx4j true。由于逻辑反转错误,它先前默认启用,如果kafka_mx4jenable设置为禁用则禁用true

  • org.apache.kafka.common.security.auth客户端jar中的包已公开并添加到javadocs中。先前位于此包中的内部类已移至其他位置。

  • 使用Authorizer并且用户对主题没有必需的权限时,代理将向请求返回TOPIC_AUTHORIZATION_FAILED错误,而不管代理上是否存在主题。如果用户具有所需权限且主题不存在,则将返回UNKNOWN_TOPIC_OR_PARTITION错误代码。

  • 更新config / consumer.properties文件以使用新的使用者配置属性。

新协议版本

  • KIP-112:LeaderAndIsrRequest v1引入了分区级is_new字段。

  • KIP-112:UpdateMetadataRequest v4引入了分区级offline_replicas字段。

  • KIP-112:MetadataResponse v5引入了分区级offline_replicas字段。

  • KIP-112:ProduceResponse v4引入了KafkaStorageException的错误代码。

  • KIP-112:FetchResponse v6引入了KafkaStorageException的错误代码。

  • KIP-152:添加了SaslAuthenticate请求以启用身份验证失败报告。如果SaslHandshake请求版本大于0,将使用此请求。

升级0.11.0 Kafka Streams应用程序

  • 将Streams应用程序从0.11.0升级到1.0不需要代理升级。Kafka Streams 1.0应用程序可以连接到0.11.0,0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。但是,Kafka Streams 1.0需要0.10或更新的消息格式,并且不适用于较旧的消息格式。

  • 如果要监视流量度量标准,则需要对报告和监视代码中的度量标准名称进行一些更改,因为度量标准传感器层次结构已更改。

  • 有一些公共的API,包括ProcessorContext#schedule()Processor#punctuate()KStreamBuilderTopologyBuilder正在被新的API弃用。我们建议您进行相应的代码更改,这些更改应该非常小,因为升级时新API看起来非常相似。

  • 有关更多详细信息,请参阅1.0.0中的Streams API更改

升级0.10.2 Kafka Streams应用程序

  • 将Streams应用程序从0.10.2升级到1.0不需要代理升级。Kafka Streams 1.0应用程序可以连接到1.0,0.11.0,0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。

  • 如果要监视流量度量标准,则需要对报告和监视代码中的度量标准名称进行一些更改,因为度量标准传感器层次结构已更改。

  • 有一些公共的API,包括ProcessorContext#schedule()Processor#punctuate()KStreamBuilderTopologyBuilder正在被新的API弃用。我们建议您进行相应的代码更改,这些更改应该非常小,因为升级时新API看起来非常相似。

  • 如果指定自定义key.serdevalue.serde并且timestamp.extractor在配置中,建议使用其替换的configure参数,因为不推荐使用这些配置。

  • 有关详细信息,请参阅0.11.0中的Streams API更改

升级0.10.1 Kafka Streams应用程序

  • 将Streams应用程序从0.10.1升级到1.0不需要代理升级。Kafka Streams 1.0应用程序可以连接到1.0,0.11.0,0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。

  • 您需要重新编译代码。只是交换Kafka Streams库jar文件将无法正常工作,并将破坏您的应用程序。

  • 如果要监视流量度量标准,则需要对报告和监视代码中的度量标准名称进行一些更改,因为度量标准传感器层次结构已更改。

  • 有一些公共的API,包括ProcessorContext#schedule()Processor#punctuate()KStreamBuilderTopologyBuilder正在被新的API弃用。我们建议您进行相应的代码更改,这些更改应该非常小,因为升级时新API看起来非常相似。

  • 如果指定自定义key.serdevalue.serde并且timestamp.extractor在配置中,建议使用其替换的configure参数,因为不推荐使用这些配置。

  • 如果使用自定义(即用户实现的)时间戳提取程序,则需要更新此代码,因为TimestampExtractor接口已更改。

  • 如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。

  • 有关更多详细信息,请参阅1.0.0中的 Streams API更改,0.11.0中的Streams API更改和 0.10.2中的Streams API更改

升级0.10.0 Kafka Streams应用程序

  • 将Streams应用程序从0.10.0升级到1.0确实需要代理升级,因为Kafka Streams 1.0应用程序只能连接到0.1,0.11,0.10.2或0.10.1代理。

  • 有几个API的变化,这是不向后兼容(参见流API中1.0.0的变化, 在0.11.0流API的变化, 在0.10.2流API的变化,和 流API的变化在0.10.1了解更多详情)。因此,您需要更新并重新编译代码。只是交换Kafka Streams库jar文件将无法正常工作,并将破坏您的应用程序。

  • 从0.10.0.x升级到1.0.2需要两次滚动跳出,upgrade.from="0.10.0"并为第一个升级阶段设置配置(参见KIP-268)。作为替代方案,也可以进行离线升级。

    • 为滚动跳动准备应用程序实例,并确保将配置upgrade.from设置"0.10.0"为新版本0.11.0.3

    • 一次退回应用程序的每个实例

    • 准备新部署的1.0.2应用程序实例以进行第二轮滚动跳动; 确保删除config的值upgrade.mode

    • 再次退回应用程序的每个实例以完成升级

  • 从0.10.0.x升级到1.0.0或1.0.1需要脱机升级(不支持滚动退回升级)

    • 停止所有旧的(0.10.0.x)应用程序实例

    • 更新您的代码并使用新代码和新的jar文件交换旧代码和jar文件

    • 重新启动所有新的(1.0.0或1.0.1)应用程序实例

从0.8.x,0.9.x,0.10.0.x,0.10.1.x或0.10.2.x升级到0.11.0.0

Kafka 0.11.0.0引入了新的消息格式版本以及有线协议更改。通过遵循下面建议的滚动升级计划,您可以保证在升级期间不会出现停机。但是,请在升级之前查看0.11.0.0中重大更改

从版本0.10.2开始,Java客户端(生产者和消费者)已经获得了与旧代理进行通信的能力。版本0.11.0客户端可以与版本0.10.0或更新的代理进行通信。但是,如果您的代理超过0.10.0,则必须先升级Kafka群集中的所有代理,然后再升级客户端。版本0.11.0经纪商支持0.8.x和更新的客户。

对于滚动升级:

  1. 更新所有代理上的server.properties并添加以下属性。CURRENT_KAFKA_VERSION指的是您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION是指当前使用的当前消息格式版本。如果您之前未覆盖消息格式,则应将CURRENT_MESSAGE_FORMAT_VERSION设置为与CURRENT_KAFKA_VERSION匹配。

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0,0.10.1或0.10.2)。

    • log.message.format.version = CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

  2. 一次升级一个代理:关闭代理,更新代码并重新启动它。

  3. 升级整个群集后,通过编辑inter.broker.protocol.version并将其设置为0.11.0来突破协议版本,但不要更改log.message.format.version

  4. 逐个重新启动代理以使新协议版本生效。

  5. 一旦所有(或大多数)消费者升级到0.11.0或更高版本,然后在每个代理上将log.message.format.version更改为0.11.0并逐个重新启动它们。请注意,旧的Scala使用者不支持新的消息格式,因此为了避免下转换的性能成本(或者只利用一次语义),必须使用新的Java使用者。

其他升级说明:

  1. 如果您愿意接受停机时间,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,它们将以新协议开始。

  2. 在升级代理后,可以随时进行协议版本的碰撞并重新启动。它不一定要立即。同样适用于消息格式版本。

  3. bin/kafka-topics.sh在更新全局设置之前,还可以使用主题管理工具()在各个主题上启用0.11.0消息格式log.message.format.version

  4. 如果要从0.10.0之前的版本升级,则在切换到0.11.0之前,无需先将消息格式更新为0.10.0。

升级0.10.2 Kafka Streams应用程序

  • 将Streams应用程序从0.10.2升级到0.11.0不需要代理升级。Kafka Streams 0.11.0应用程序可以连接到0.11.0,0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。

  • 如果指定自定义key.serdevalue.serde并且timestamp.extractor在配置中,建议使用其替换的configure参数,因为不推荐使用这些配置。

  • 有关详细信息,请参阅0.11.0中的Streams API更改

升级0.10.1 Kafka Streams应用程序

  • 将Streams应用程序从0.10.1升级到0.11.0不需要代理升级。Kafka Streams 0.11.0应用程序可以连接到0.11.0,0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。

  • 您需要重新编译代码。只是交换Kafka Streams库jar文件将无法正常工作,并将破坏您的应用程序。

  • 如果指定自定义key.serdevalue.serde并且timestamp.extractor在配置中,建议使用其替换的configure参数,因为不推荐使用这些配置。

  • 如果使用自定义(即用户实现的)时间戳提取程序,则需要更新此代码,因为TimestampExtractor接口已更改。

  • 如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。

  • 有关详细信息,请参阅0.11.0中的Streams API更改和 0.10.2中的Streams API更改

升级0.10.0 Kafka Streams应用程序

  • 将Streams应用程序从0.10.0升级到0.11.0确实需要代理升级,因为Kafka Streams 0.11.0应用程序只能连接到0.11.0,0.10.2或0.10.1代理。

  • 有几个API的变化,这是不向后兼容(参见在0.11.0流API的变化, 在0.10.2流API的变化,并 在0.10.1流API的变化有详细介绍)。因此,您需要更新并重新编译代码。只是交换Kafka Streams库jar文件将无法正常工作,并将破坏您的应用程序。

  • 从0.10.0.x升级到0.11.0.3需要两次滚动跳出,upgrade.from="0.10.0"并为第一个升级阶段设置配置(参见KIP-268)。作为替代方案,也可以进行离线升级。

    • 为滚动跳动准备应用程序实例,并确保将配置upgrade.from设置"0.10.0"为新版本0.11.0.3

    • 一次退回应用程序的每个实例

    • 准备新部署的0.11.0.3应用程序实例以进行第二轮滚动跳动; 确保删除config的值upgrade.mode

    • 再次退回应用程序的每个实例以完成升级

  • 从0.10.0.x升级到0.11.0.0,0.11.0.1或0.11.0.2需要脱机升级(不支持滚动退回升级)

    • 停止所有旧的(0.10.0.x)应用程序实例

    • 更新您的代码并使用新代码和新的jar文件交换旧代码和jar文件

    • 重新启动所有新的(0.11.0.0,0.11.0.1或0.11.0.2)应用程序实例

0.11.0.3中的显着变化

  • upgrade.from添加了新的Kafka Streams配置参数,允许从版本0.10.0.x滚动退回升级

  • 有关此新配置的详细信息,请参阅Kafka Streams升级指南

0.11.0.0中的显着变化

  • 现在默认禁用不洁领导者选举。新的默认值有利于可用性的持久性。希望保留先前行为的用户应将代理配置设置unclean.leader.election.enabletrue

  • 制片人配置block.on.buffer.fullmetadata.fetch.timeout.mstimeout.ms已被删除。它们最初在Kafka 0.9.0.0中被弃用。

  • offsets.topic.replication.factor券商的配置现在在强制汽车主题产生。在群集大小满足此复制因子要求之前,内部自动主题创建将失败并出现GROUP_COORDINATOR_NOT_AVAILABLE错误。

  • 使用snappy压缩数据时,生产者和代理将使用压缩方案的默认块大小(2 x 32 KB)而不是1 KB,以提高压缩率。有报道称,压缩数据的较小块大小比使用较大块大小压缩时大50%。对于snappy案例,具有5000个分区的生产者将需要额外的315 MB JVM堆。

  • 类似地,当使用gzip压缩数据时,生产者和代理将使用8 KB而不是1 KB作为缓冲区大小。gzip的默认值过低(512字节)。

  • 代理配置max.message.bytes现在适用于一批消息的总大小。以前,该设置适用于批量压缩消息或单独应用于非压缩消息。消息批处理可能只包含一条消息,因此在大多数情况下,单个消息大小的限制只会减少批处理格式的开销。但是,消息格式转换有一些微妙的含义(更多细节见下文)。还要注意,虽然先前代理将确保在每个获取请求中返回至少一条消息(无论总数和分区级提取大小如何),但现在相同的行为适用于一个消息批处理。

  • 默认情况下启用GC日志旋转,有关详细信息,请参阅KAFKA-3754。

  • 已删除RecordMetadata,MetricName和Cluster类的不推荐构造函数。

  • 通过新的Headers接口添加了用户头支持,提供用户头读取和写入访问。

  • ProducerRecord和ConsumerRecord通过Headers headers()方法调用公开新的Headers API 。

  • 引入了ExtendedSerializer和ExtendedDeserializer接口以支持标头的序列化和反序列化。如果配置的序列化器和反序列化器不是上述类,则将忽略标头。

  • group.initial.rebalance.delay.ms引入了一个新的配置。此配置指定GroupCoordinator将延迟初始消费者重新平衡的时间(以毫秒为单位)。group.initial.rebalance.delay.ms随着新成员加入集团的价值,再平衡将进一步延迟,最多可达到max.poll.interval.ms。默认值为3秒。在开发和测试期间,可能需要将其设置为0,以便不延迟测试执行时间。

  • org.apache.kafka.common.Cluster#partitionsForTopicpartitionsForNodeavailablePartitionsForTopic方法会返回一个空列表,而不是null在的情况下(这被认为是不好的做法)的元数据所要求的主题不存在。

  • 流API的配置参数timestamp.extractorkey.serde以及value.serde被弃用,替换default.timestamp.extractordefault.key.serdedefault.value.serde分别。

  • 对于Java使用者commitAsyncAPI 中的偏移提交失败,我们不再在将实例RetriableCommitFailedException传递给提交回调时暴露潜在原因。有关 详细信息,请参阅 KAFKA-5052

新协议版本

  • KIP-107:FetchRequest v5引入了分区级log_start_offset字段。

  • KIP-107:FetchResponse v5引入了分区级log_start_offset字段。

  • KIP-82:ProduceRequest v3 header在消息协议中引入了一个包含key字段和value字段的数组。

  • KIP-82:FetchResponse v5 header在消息协议中引入了一个包含key字段和value字段的数组。

关于完全一次语义的注释

Kafka 0.11.0包括对生产者中的幂等和事务功能的支持。幂等传递确保在单个生产者的生命周期内将消息一次性传递到特定主题分区。事务传递允许生产者将数据发送到多个分区,以便所有消息都成功传递,或者都不传递。这些功能共同实现了Kafka中的“一次语义”。用户指南中提供了有关这些功能的更多详细信息,但下面我们将添加一些有关在升级后的群集中启用它们的具体说明。请注意,不需要启用EoS,如果未使用,则不会影响代理的行为。

  • 只有新的Java生产者和消费者支持一次语义。

  • 这些功能主要取决于0.11.0消息格式。尝试在较旧的格式上使用它们将导致不受支持的版本错误。

  • 事务状态存储在新的内部主题中__transaction_state。在第一次尝试使用事务请求API之前,不会创建此主题。与使用者偏移主题类似,有几种设置可用于控制主题的配置。例如,transaction.state.log.min.isr控制此主题的最小ISR。有关选项的完整列表,请参阅用户指南中的配置部分。

  • 对于安全集群,事务API需要新的ACL,可以使用bin/kafka-acls.sh。工具。

  • Kafka中的EoS引入了新的请求API并修改了几个现有API。有关 详细信息,请参阅 KIP-98

有关0.11.0中新消息格式的说明

0.11.0消息格式包括几个主要的增强功能,以支持生产者更好的传递语义(参见KIP-98)和改进的复制容错(参见KIP-101)。虽然新格式包含更多信息以使这些改进成为可能,但我们使批处理格式更加高效。只要每批邮件的数量超过2,您就可以期望降低总体开销。但是,对于较小的批次,可能会产生较小的性能影响。请参阅此处,了解新消息格式的初始性能分析结果。您还可以在KIP-98提案中找到有关消息格式的更多详细信息 。

新消息格式的一个显着差异是即使未压缩的消息也作为单个批处理存储在一起。这对代理配置有一些影响max.message.bytes,它限制了单个批处理的大小。首先,如果较旧的客户端使用旧格式向主题分区生成消息,并且消息单独小于 max.message.bytes,则代理在上转换过程中合并为单个批次后仍可拒绝它们。通常,当单个消息的聚合大小大于时,可能会发生这种情况max.message.bytes。对于阅读从新格式下转换的邮件的旧消费者,也会产生类似的效果:如果获取的大小设置不至少与max.message.bytes即使各个未压缩消息小于配置的提取大小,消费者也可能无法取得进展。此行为不会影响0.10.1.0及更高版本的Java客户端,因为它使用更新的提取协议,确保即使超过提取大小也至少可以返回一条消息。要解决这些问题,您应该确保1)生产者的批量大小未设置为大于max.message.bytes,以及2)消费者的提取大小至少设置为大小max.message.bytes

关于升级到0.10.0消息格式的性能影响的大多数讨论 仍然与0.11.0升级有关。这主要影响不受TLS保护的集群,因为在这种情况下已经不可能进行“零拷贝”传输。为了避免向下转换的成本,您应该确保将使用者应用程序升级到最新的0.11.0客户端。值得注意的是,由于旧的消费者已经在0.11.0.0中弃用,因此它不支持新的消息格式。您必须升级才能使用新的使用者使用新的邮件格式,而无需降低转换成本。请注意,0.11.0使用者支持向后兼容0.10.0代理和向上,因此可以在代理之前首先升级客户端。

从0.8.x,0.9.x,0.10.0.x或0.10.1.x升级到0.10.2.0

0.10.2.0有线协议更改。通过遵循下面建议的滚动升级计划,您可以保证在升级期间不会出现停机。但是,请在升级之前查看0.10.2.0中重大更改

从版本0.10.2开始,Java客户端(生产者和消费者)已经获得了与旧代理进行通信的能力。版本0.10.2客户端可以与版本0.10.0或更新的代理进行通信。但是,如果您的代理超过0.10.0,则必须先升级Kafka群集中的所有代理,然后再升级客户端。版本0.10.2代理支持0.8.x和更新的客户端。

对于滚动升级:

  1. 更新所有代理上的server.properties文件并添加以下属性:

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0或0.10.1)。

    • log.message.format.version = CURRENT_KAFKA_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

  2. 一次升级一个代理:关闭代理,更新代码并重新启动它。

  3. 升级整个群集后,通过编辑inter.broker.protocol.version并将其设置为0.10.2来破坏协议版本。

  4. 如果您之前的消息格式为0.10.0,请将log.message.format.version更改为0.10.2(这是一个无操作,因为消息格式对于0.10.0,0.10.1和0.10.2是相同的)。如果您以前的消息格式版本低于0.10.0,请不要更改log.message.format.version - 只有在所有使用者升级到0.10.0.0或更高版本后,此参数才会更改。

  5. 逐个重新启动代理以使新协议版本生效。

  6. 如果此时log.message.format.version仍然低于0.10.0,请等待所有使用者升级到0.10.0或更高版本,然后在每个代理上将log.message.format.version更改为0.10.2并且一个接一个地重启它们。

注意:如果您愿意接受停机时间,您可以简单地关闭所有代理,更新代码并启动所有代理。默认情况下,它们将以新协议开始。

注意:升级协议版本并重新启动可以在升级代理后随时进行。它不一定要立即。

升级0.10.1 Kafka Streams应用程序

  • 将Streams应用程序从0.10.1升级到0.10.2不需要代理升级。Kafka Streams 0.10.2应用程序可以连接到0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。

  • 您需要重新编译代码。只是交换Kafka Streams库jar文件将无法正常工作,并将破坏您的应用程序。

  • 如果使用自定义(即用户实现的)时间戳提取程序,则需要更新此代码,因为TimestampExtractor接口已更改。

  • 如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。

  • 有关详细信息,请参阅0.10.2中的Streams API更改

升级0.10.0 Kafka Streams应用程序

  • 将Streams应用程序从0.10.0升级到0.10.2确实需要代理升级,因为Kafka Streams 0.10.2应用程序只能连接到0.10.2或0.10.1代理。

  • 有两个API更改,这些更改不向后兼容(有关详细信息,请参阅0.10.2中的Streams API更改)。因此,您需要更新并重新编译代码。只是交换Kafka Streams库jar文件将无法正常工作,并将破坏您的应用程序。

  • 从0.10.0.x升级到0.10.2.2需要两次滚动跳出,upgrade.from="0.10.0"并为第一个升级阶段设置配置(参见KIP-268)。作为替代方案,也可以进行离线升级。

    • 为滚动跳动准备应用程序实例,并确保将配置upgrade.from设置"0.10.0"为新版本0.10.2.2

    • 一次退回应用程序的每个实例

    • 准备新部署的0.10.2.2应用程序实例以进行第二轮滚动跳动; 确保删除config的值upgrade.mode

    • 再次退回应用程序的每个实例以完成升级

  • 从0.10.0.x升级到0.10.2.0或0.10.2.1需要脱机升级(不支持滚动退回升级)

    • 停止所有旧的(0.10.0.x)应用程序实例

    • 更新您的代码并使用新代码和新的jar文件交换旧代码和jar文件

    • 重新启动所有新的(0.10.2.0或0.10.2.1)应用程序实例

0.10.2.2中的显着变化

  • upgrade.from添加了新配置参数,允许从版本0.10.0.x滚动退回升级

0.10.2.1中的显着变化

  • 更改了两个StreamsConfig类配置的默认值,以提高Kafka Streams应用程序的弹性。内部Kafka Streams生成器retries默认值从0更改为10.内部Kafka Streams使用者max.poll.interval.ms 默认值已从300000更改为Integer.MAX_VALUE

0.10.2.0中的显着变化

  • Java客户端(生产者和消费者)已经获得了与较早的经纪人进行通信的能力。版本0.10.2客户端可以与版本0.10.0或更新的代理进行通信。请注意,某些功能在使用较旧的代理时不可用或受到限制。

  • InterruptException如果调用线程被中断,Java消费者可能会抛出几个方法。KafkaConsumer有关此更改的更深入说明,请参阅Javadoc。

  • Java消费者现在优雅地关闭。默认情况下,使用者最多等待30秒才能完成挂起的请求。添加了一个带有超时的新关闭API KafkaConsumer来控制最长等待时间。

  • 由逗号分隔的多个正则表达式可以通过--whitelist选项与新Java使用者一起传递给MirrorMaker。当使用旧的Scala使用者时,这使得行为与MirrorMaker一致。

  • 将Streams应用程序从0.10.1升级到0.10.2不需要代理升级。Kafka Streams 0.10.2应用程序可以连接到0.10.2和0.10.1代理(但是不可能连接到0.10.0代理)。

  • 从Streams API中删除了Zookeeper依赖项。Streams API现在使用Kafka协议来管理内部主题,而不是直接修改Zookeeper。这消除了直接访问Zookeeper的特权的需要,并且不再在Streams应用程序中设置“StreamsConfig.ZOOKEEPER_CONFIG”。如果Kafka群集受到保护,Streams应用程序必须具有创建新主题所需的安全权限。

  • 在StreamsConfig类中添加了几个新字段,包括“security.protocol”,“connections.max.idle.ms”,“retry.backoff.ms”,“reconnect.backoff.ms”和“request.timeout.ms”。用户应注意默认值,并在需要时设置这些值。有关更多详细信息,请参阅3.5 Kafka Streams配置

新协议版本

  • KIP-88:如果topics阵列设置为,OffsetFetchRequest v2支持检索所有主题的偏移量null

  • KIP-88:OffsetFetchResponse v2引入了顶级error_code字段。

  • KIP-103:UpdateMetadataRequest v3 listener_nameend_points数组元素引入了一个字段。

  • KIP-108:CreateTopicsRequest v1引入了一个validate_only字段。

  • KIP-108:CreateTopicsResponse v1 error_messagetopic_errors数组元素引入了一个字段。

从0.8.x,0.9.x或0.10.0.X升级到0.10.1.0

0.10.1.0有线协议更改。通过遵循下面建议的滚动升级计划,您可以保证在升级期间不会出现停机。但是,请注意升级前0.10.1.0中潜在破坏性变化。 
注意:由于引入了新协议,因此在升级客户端之前升级Kafka群集非常重要(即0.10.1.x客户端仅支持0.10.1.x或更高版本的代理,而0.10.1.x代理也支持较旧的客户端) 。

对于滚动升级:

  1. 更新所有代理上的server.properties文件并添加以下属性:

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2.0,0.9.0.0或0.10.0.0)。

    • log.message.format.version = CURRENT_KAFKA_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

  2. 一次升级一个代理:关闭代理,更新代码并重新启动它。

  3. 升级整个群集后,通过编辑inter.broker.protocol.version并将其设置为0.10.1.0来破坏协议版本。

  4. 如果您之前的消息格式为0.10.0,请将log.message.format.version更改为0.10.1(这是无操作,因为0.10.0和0.10.1的消息格式相同)。如果您以前的消息格式版本低于0.10.0,请不要更改log.message.format.version - 只有在所有使用者升级到0.10.0.0或更高版本后,此参数才会更改。

  5. 逐个重新启动代理以使新协议版本生效。

  6. 如果此时log.message.format.version仍然低于0.10.0,请等待所有使用者升级到0.10.0或更高版本,然后在每个代理上将log.message.format.version更改为0.10.1并且一个接一个地重启它们。

注意:如果您愿意接受停机时间,您可以简单地关闭所有代理,更新代码并启动所有代理。默认情况下,它们将以新协议开始。

注意:升级协议版本并重新启动可以在升级代理后随时进行。它不一定要立即。

潜在的破裂变化在0.10.1.0

  • 日志保留时间不再基于日志段的上次修改时间。相反,它将基于日志段中消息的最大时间戳。

  • 日志滚动时间不再取决于日志段创建时间。相反,它现在基于消息中的时间戳。进一步来说。如果段中第一条消息的时间戳为T,则当新消息的时间戳大于或等于T + log.roll.ms时,将推出日志

  • 由于为每个段添加了时间索引文件,因此0.10.0的打开文件处理程序将增加~33%。

  • 时间索引和偏移索引共享相同的索引大小配置。由于每次索引条目的大小是偏移索引条目的1.5倍。用户可能需要增加log.index.size.max.bytes以避免潜在的频繁日志滚动。

  • 由于索引文件数量的增加,在一些具有大量日志段(例如> 15K)的代理上,代理启动期间的日志加载过程可能会更长。根据我们的实验,将num.recovery.threads.per.data.dir设置为1可能会减少日志加载时间。

升级0.10.0 Kafka Streams应用程序

  • 将Streams应用程序从0.10.0升级到0.10.1确实需要代理升级,因为Kafka Streams 0.10.1应用程序只能连接到0.10.1代理。

  • 有两个API更改,这些更改不向后兼容(有关详细信息,请参阅0.10.1中的Streams API更改)。因此,您需要更新并重新编译代码。只是交换Kafka Streams库jar文件将无法正常工作,并将破坏您的应用程序。

  • 从0.10.0.x升级到0.10.1.2需要两次滚动跳出,upgrade.from="0.10.0"并在第一次升级阶段配置设置(参见KIP-268)。作为替代方案,也可以进行离线升级。

    • 为滚动跳动准备应用程序实例,并确保将配置upgrade.from设置"0.10.0"为新版本0.10.1.2

    • 一次退回应用程序的每个实例

    • 准备新部署的0.10.1.2应用程序实例以进行第二轮滚动跳动; 确保删除config的值upgrade.mode

    • 再次退回应用程序的每个实例以完成升级

  • 从0.10.0.x升级到0.10.1.0或0.10.1.1需要脱机升级(不支持滚动退回升级)

    • 停止所有旧的(0.10.0.x)应用程序实例

    • 更新您的代码并使用新代码和新的jar文件交换旧代码和jar文件

    • 重新启动所有新的(0.10.1.0或0.10.1.1)应用程序实例

0.10.1.0中的显着变化

  • 新的Java消费者不再处于测试阶段,我们建议将其用于所有新开发。旧的Scala消费者仍然受到支持,但它们将在下一版本中弃用,并将在未来的主要版本中删除。

  • --new-consumer--new.consumer开关不再需要使用像MirrorMaker和控制台消费者与消费者的新工具; 一个人只需要传递一个Kafka代理来连接而不是ZooKeeper集合。此外,已弃用对旧消费者的控制台消费者的使用,并将在未来的主要版本中将其删除。

  • 现在可以通过群集ID唯一标识Kafka群集。当代理升级到0.10.1.0时,它将自动生成。群集ID可通过kafka.server获得:type = KafkaServer,name = ClusterId metric,它是元数据响应的一部分。序列化程序,客户端拦截器和度量标准报告程序可以通过实现ClusterResourceListener接口来接收集群标识。

  • BrokerState“RunningAsController”(值4)已被删除。由于存在错误,经纪人只能在转出之前暂时处于此状态,因此移除的影响应该是最小的。检测给定代理是否为控制器的推荐方法是通过kafka.controller:type = KafkaController,name = ActiveControllerCount metric。

  • 新的Java Consumer现在允许用户按分区上的时间戳搜索偏移量。

  • 新的Java Consumer现在支持后台线程的心跳。max.poll.interval.ms在消费者主动离开组之前,有一个新配置 控制轮询调用之间的最长时间(默认为5分钟)。配置的值 request.timeout.ms必须始终大于max.poll.interval.ms因为这是在消费者重新平衡时JoinGroup请求可以在服务器上阻塞的最长时间,因此我们将其默认值更改为刚好超过5分钟。最后,默认值session.timeout.ms已调整为10秒,默认值max.poll.records已更改为500。

  • 使用Authorizer且用户没有对主题进行描述授权时,代理将不再向请求返回TOPIC_AUTHORIZATION_FAILED错误,因为这会泄漏主题名称。而是返回UNKNOWN_TOPIC_OR_PARTITION错误代码。这可能会在使用生产者和消费者时导致意外超时或延迟,因为Kafka客户端通常会在未知主题错误时自动重试。如果您怀疑可能发生这种情况,则应查阅客户端日志。

  • 默认情况下,获取响应具有大小限制(对于使用者为50 MB,对于复制为10 MB)。现有的每个分区限制也适用(消费者和复制为1 MB)。请注意,这些限制都不是绝对最大值,如下一点所述。

  • 如果找到大于响应/分区大小限制的消息,则消费者和副本可以取得进展。更具体地说,如果获取的第一个非空分区中的第一条消息大于其中一个或两个限制,则仍将返回该消息。

  • 重载的构造函数被添加到kafka.api.FetchRequestkafka.javaapi.FetchRequest允许调用者指定分区的顺序(因为在v3中顺序很重要)。不推荐使用先前存在的构造函数,并在发送请求之前对分区进行洗牌以避免饥饿问题。

新协议版本

  • ListOffsetRequest v1支持基于时间戳的准确偏移搜索。

  • MetadataResponse v2引入了一个新字段:“cluster_id”。

  • FetchRequest v3支持限制响应大小(除现有的每个分区限制外),如果需要进行更改,它会返回大于限制的消息,并且请求中的分区顺序现在很重要。

  • JoinGroup v1引入了一个新字段:“rebalance_timeout”。

从0.8.x或0.9.x升级到0.10.0.0

0.10.0.0具有潜在的重大变化(升级前请查看)以及升级后可能产生的 性能影响。通过遵循下面建议的滚动升级计划,您可以保证在升级期间和之后不会出现停机并且不会影响性能。 
注意:由于引入了新协议,因此在升级客户端之前升级Kafka群集非常重要。

对版本为0.9.0.0的客户端的说明:由于0.9.0.0中引入了一个错误,依赖于ZooKeeper的客户端(旧的Scala高级使用者和MirrorMaker,如果与旧的使用者一起使用)将无法与0.10.0.x代理一起使用。因此,在将代理升级到0.10.0.x 之前,应将0.9.0.0客户端升级到0.9.0.1。0.8.X或0.9.0.1客户端不需要此步骤。

对于滚动升级:

  1. 更新所有代理上的server.properties文件并添加以下属性:

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2或0.9.0.0)。

    • log.message.format.version = CURRENT_KAFKA_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

  2. 升级经纪人。这可以通过简单地将其关闭,更新代码并重新启动来一次完成。

  3. 升级整个群集后,通过编辑inter.broker.protocol.version并将其设置为0.10.0.0来破坏协议版本。注意:您不应该触摸log.message.format.version - 此参数只应在所有使用者升级到0.10.0.0后更改

  4. 逐个重新启动代理以使新协议版本生效。

  5. 将所有使用者升级到0.10.0后,在每个代理上将log.message.format.version更改为0.10.0并逐个重新启动。

注意:如果您愿意接受停机时间,您可以简单地关闭所有代理,更新代码并启动所有代理。默认情况下,它们将以新协议开始。

注意:升级协议版本并重新启动可以在升级代理后随时进行。它不一定要立即。

升级到0.10.0.0后可能会对性能产生影响

0.10.0中的消息格式包括新的时间戳字段,并使用压缩消息的相对偏移量。可以通过server.properties文件中的log.message.format.version配置磁盘消息格式。默认的磁盘消息格式为0.10.0。如果消费者客户端在0.10.0.0之前的版本上,它只能理解0.10.0之前的消息格式。在这种情况下,代理能够在将响应发送到旧版本的消费者之前将消息从0.10.0格式转换为更早的格式。但是,在这种情况下,代理不能使用零拷贝传输。来自Kafka社区的关于性能影响的报告显示,升级后CPU利用率从之前的20%上升到100%,这迫使所有客户端立即升级以使性能恢复正常。要在消费者升级到0.10.0.0之前避免此类消息转换,可以在将代理升级到0.10.0.0时将log.message.format.version设置为0.8.2或0.9.0。这样,代理仍然可以使用零拷贝传输将数据发送给旧的消费者。消费者升级后,可以在代理上将消息格式更改为0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且可用于支持尚未更新到较新客户端的一些应用程序,但即使是过度配置的群集也支持所有消费者流量,这是不切实际的。因此,当经纪人升级但大多数客户没有升级时,尽可能避免消息转换至关重要。将代理升级到0.10.0.0时,message.format.version为0.8.2或0.9.0。这样,代理仍然可以使用零拷贝传输将数据发送给旧的消费者。消费者升级后,可以在代理上将消息格式更改为0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且可用于支持尚未更新到较新客户端的一些应用程序,但即使是过度配置的群集也支持所有消费者流量,这是不切实际的。因此,当经纪人升级但大多数客户没有升级时,尽可能避免消息转换至关重要。将代理升级到0.10.0.0时,message.format.version为0.8.2或0.9.0。这样,代理仍然可以使用零拷贝传输将数据发送给旧的消费者。消费者升级后,可以在代理上将消息格式更改为0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且可用于支持尚未更新到较新客户端的一些应用程序,但即使是过度配置的群集也支持所有消费者流量,这是不切实际的。因此,当经纪人升级但大多数客户没有升级时,尽可能避免消息转换至关重要。代理仍然可以使用零拷贝传输将数据发送给旧的消费者。消费者升级后,可以在代理上将消息格式更改为0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且可用于支持尚未更新到较新客户端的一些应用程序,但即使是过度配置的群集也支持所有消费者流量,这是不切实际的。因此,当经纪人升级但大多数客户没有升级时,尽可能避免消息转换至关重要。代理仍然可以使用零拷贝传输将数据发送给旧的消费者。消费者升级后,可以在代理上将消息格式更改为0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且可用于支持尚未更新到较新客户端的一些应用程序,但即使是过度配置的群集也支持所有消费者流量,这是不切实际的。因此,当经纪人升级但大多数客户没有升级时,尽可能避免消息转换至关重要。支持转换以确保兼容性,并且可用于支持尚未更新到较新客户端的一些应用程序,但即使是过度配置的群集也支持所有消费者流量,这是不切实际的。因此,当经纪人升级但大多数客户没有升级时,尽可能避免消息转换至关重要。支持转换以确保兼容性,并且可用于支持尚未更新到较新客户端的一些应用程序,但即使是过度配置的群集也支持所有消费者流量,这是不切实际的。因此,当经纪人升级但大多数客户没有升级时,尽可能避免消息转换至关重要。

对于升级到0.10.0.0的客户端,不会对性能产生影响。

注意:通过设置消息格式版本,可以证明所有现有消息都在该消息格式版本之上或之下。否则0.10.0.0之前的消费者可能会破产。特别是,在消息格式设置为0.10.0后,不应将其更改回早期格式,因为它可能会破坏0.10.0.0之前版本的消费者。

注意:由于每条消息中引入了额外的时间戳,发送小消息的生产者可能会因为增加的开销而看到消息吞吐量降低。同样,复制现在每个消息传输额外的8个字节。如果您的集群运行时接近网络容量,则可能会使网卡瘫痪,并查看由于过载导致的故障和性能问题。

注意:如果已对生产者启用压缩,则在某些情况下,您可能会注意到生成器吞吐量降低和/或代理上的压缩率降低。当接收压缩消息时,0.10.0代理会避免重新压缩消息,这通常会减少延迟并提高吞吐量。但是,在某些情况下,这可能会减少生产者的批量大小,从而导致吞吐量降低。如果发生这种情况,用户可以调整生产者的linger.ms和batch.size以获得更好的吞吐量。此外,用于使用snappy压缩消息的生成器缓冲区小于代理使用的生成器缓冲区,这可能会对磁盘上的消息的压缩率产生负面影响。我们打算在未来的Kafka版本中对此进行配置。

 

潜在的突破变化在0.10.0.0

  • 从Kafka 0.10.0.0开始,Kafka中的消息格式版本表示为Kafka版本。例如,消息格式0.9.0指的是Kafka 0.9.0支持的最高消息版本。

  • 已引入消息格式0.10.0,默认情况下使用它。它包括消息中的时间戳字段,相对偏移量用于压缩消息。

  • 引入了ProduceRequest / Response v2,默认情况下使用它来支持消息格式0.10.0

  • 引入了FetchRequest / Response v2,默认情况下它用于支持消息格式0.10.0

  • MessageFormatter接口已更改def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)

  • MessageReader接口已更改def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]

  • MessageFormatter的包已更改kafka.toolskafka.common

  • MessageReader的包已更改kafka.toolskafka.common

  • MirrorMakerMessageHandler不再公开该handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])方法,因为它从未被调用过。

  • 0.7 KafkaMigrationTool不再与Kafka打包在一起。如果需要从0.7迁移到0.10.0,请首先迁移到0.8,然后按照记录的升级过程从0.8升级到0.10.0。

  • 新的使用者已将其API标准化为接受java.util.Collection作为方法参数的序列类型。可能必须更新现有代码才能使用0.10.0客户端库。

  • LZ4压缩消息处理已更改为使用可互操作的成帧规范(LZ4f v1.5.1)。为了保持与旧客户端的兼容性,此更改仅适用于消息格式0.10.0及更高版本。使用v0 / v1(消息格式0.9.0)生成/获取LZ4压缩消息的客户端应继续使用0.9.0帧实现。使用Produce / Fetch协议v2或更高版本的客户端应使用可互操作的LZ4f框架。有关可互操作的LZ4库的列表,请访问http://www.lz4.org/

0.10.0.0中的显着变化

  • 从Kafka 0.10.0.0开始,一个名为Kafka Streams的新客户端库可用于对Kafka主题中存储的数据进行流处理。由于上面提到的消息格式更改,此新客户端库仅适用于0.10.x和向上版本的代理。有关更多信息,请阅读Streams文档

  • receive.buffer.bytes对于新的使用者,配置参数的默认值现在为64K。

  • 新的使用者现在公开配置参数exclude.internal.topics以限制内部主题(例如消费者偏移主题)意外地包含在正则表达式订阅中。默认情况下,它已启用。

  • 旧的Scala生产商已被弃用。用户应尽快将其代码迁移到kafka-clients JAR中包含的Java生产者。

  • 新的消费者API已经标记为稳定。

从0.8.0,0.8.1.X或0.8.2.X升级到0.9.0.0

0.9.0.0具有潜在的重大变化(请在升级之前进行检查)以及从先前版本更改的代理间协议。这意味着升级的代理和客户端可能与旧版本不兼容。升级客户端之前升级Kafka群集非常重要。如果您正在使用MirrorMaker,则应首先升级下游群集。

对于滚动升级:

  1. 更新所有代理上的server.properties文件并添加以下属性:inter.broker.protocol.version = 0.8.2.X

  2. 升级经纪人。这可以通过简单地将其关闭,更新代码并重新启动来一次完成。

  3. 升级整个群集后,通过编辑inter.broker.protocol.version并将其设置为0.9.0.0来破坏协议版本。

  4. 逐个重新启动代理以使新协议版本生效

注意:如果您愿意接受停机时间,您可以简单地关闭所有代理,更新代码并启动所有代理。默认情况下,它们将以新协议开始。

注意:升级协议版本并重新启动可以在升级代理后随时进行。它不一定要立即。

潜在的突破性变化为0.9.0.0

  • 不再支持Java 1.6。

  • 不再支持Scala 2.9。

  • 现在,默认情况下,大于1000的代理ID保留为自动分配的代理ID。如果您的群集具有高于该阈值的现有代理ID,请确保相应地增加reserved.broker.max.id代理配置属性。

  • 配置参数replica.lag.max.messages已删除。在决定哪些副本同步时,分区负责人将不再考虑滞后消息的数量。

  • 配置参数replica.lag.time.max.ms现在不仅指自上次从副本获取请求以来所经过的时间,还指自上次捕获副本以来的时间。仍然从领导者那里获取消息但没有赶上replica.lag.time.max.ms中的最新消息的副本将被视为不同步。

  • 压缩主题不再接受没有密钥的消息,如果尝试这样做,则生产者抛出异常。在0.8.x中,没有密钥的消息会导致日志压缩线程随后抱怨并退出(并停止压缩所有压缩的主题)。

  • MirrorMaker不再支持多个目标群集。因此,它只接受单个--consumer.config参数。要镜像多个源群集,每个源群集至少需要一个MirrorMaker实例,每个实例都有自己的使用者配置。

  • org.apache.kafka.clients.tools。*下打包的工具已移至org.apache.kafka.tools。*。所有包含的脚本仍将照常运行,只有直接导入这些类的自定义代码才会受到影响。

  • kafka-run-class.sh中已更改默认的Kafka JVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)。

  • 现在,kafka-topics.sh脚本(kafka.admin.TopicCommand)在失败时以非零退出代码退出。

  • kafka-topics.sh脚本(kafka.admin.TopicCommand)现在将在主题名称由于使用“。”而导致风险度量标准冲突时打印警告。或主题名称中的“_”,以及实际碰撞时的错误。

  • kafka-console-producer.sh脚本(kafka.tools.ConsoleProducer)将使用Java生成器而不是旧的Scala生成器作为默认值,并且用户必须指定“old-producer”才能使用旧生成器。

  • 默认情况下,所有命令行工具都会将所有日志消息打印到stderr而不是stdout。

0.9.0.1中的显着变化

  • 可以通过将broker.id.generation.enable设置为false来禁用新的代理ID生成功能。

  • 默认情况下,配置参数log.cleaner.enable现在为true。这意味着具有cleanup.policy = compact的主题现在将默认压缩,并且将通过log.cleaner.dedupe.buffer.size将128 MB的堆分配给清理进程。您可能希望根据您对压缩主题的使用情况来查看log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。

  • 默认情况下,新使用者的配置参数fetch.min.bytes的默认值现在为1。

弃权在0.9.0.0

  • 不推荐使用kafka-topics.sh脚本(kafka.admin.TopicCommand)更改主题配置。接下来,请使用kafka-configs.sh脚本(kafka.admin.ConfigCommand)来实现此功能。

  • kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已被弃用。展望未来,请使用kafka-consumer-groups.sh(kafka.admin.ConsumerGroupCommand)来实现此功能。

  • 不推荐使用kafka.tools.ProducerPerformance类。接下来,请使用org.apache.kafka.tools.ProducerPerformance来实现此功能(kafka-producer-perf-test.sh也将更改为使用新类)。

  • 生产者配置block.on.buffer.full已被弃用,将在以后的版本中删除。目前,其默认值已更改为false。KafkaProducer将不再抛出BufferExhaustedException,而是使用max.block.ms值来阻塞,之后它将抛出TimeoutException。如果block.on.buffer.full属性显式设置为true,则会将max.block.ms设置为Long.MAX_VALUE,并且metadata.fetch.timeout.ms将不会被授予

从0.8.1升级到0.8.2

0.8.2与0.8.1完全兼容。升级可以通过简单地将其关闭,更新代码并重新启动来一次完成一个代理。

从0.8.0升级到0.8.1

0.8.1与0.8完全兼容。升级可以通过简单地将其关闭,更新代码并重新启动来一次完成一个代理。

从0.7升级

版本0.7与较新版本不兼容。对API,ZooKeeper数据结构,协议和配置进行了重大更改,以便添加复制(在0.7中缺失)。从0.7升级到更高版本需要一个特殊的迁移工具。无需停机即可完成此迁移。

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。
编辑|冷眼丶
微信公众号|import_bigdata


欢迎点赞+收藏+转发朋友圈素质三连


文章不错?点个【在看】吧! ?

浏览 47
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报