apache-kafka - 重新启动集群时连接消费者作业被删除

标签 apache-kafka apache-kafka-connect

我在更改与 kafka 相关的一些属性并重新启动集群时遇到以下问题。

In kafka Consumer, there were 5 consumer jobs are running . 

如果我们进行了一些重要的属性更改,并且在重新启动集群时,一些/所有现有的消费者作业将无法启动。

Ideally all the consumer jobs should start , 

因为它将从下面的系统主题中获取元数据信息。

config.storage.topic
offset.storage.topic
status.storage.topic

最佳答案

首先,介绍一下背景。 Kafka 将其所有数据存储在 topics 中,但这些主题(或者更确切地说,构成主题的分区)是仅附加日志,除非完成某些操作,否则它们将永远增长。为了防止这种情况,Kafka 有能力通过两种方式清理主题:保留和压缩。配置为使用保留 的主题将在可配置的时间长度内保留数据:代理可以自由删除任何早于该时间的日志消息。配置为使用 compaction 的主题要求每条消息都有一个 key ,代理将始终保留每个不同 key 的最后一条已知消息。当每条消息(即键/值对)代表键的最后已知状态时,压缩非常方便;由于消费者正在阅读主题以获取每个键的最后已知状态,因此如果删除旧状态,他们最终将更快地到达最后一个状态。

代理将对主题使用哪种清理策略取决于几个因素。默认情况下,每个隐式或显式创建的主题都将使用保留,但您可以通过多种方式进行更改:

  • 更改全局 log.cleanup.policy 代理设置,仅影响在那之后创建的主题;或
  • 在您 create or modify a topic 时指定 cleanup.policy 主题特定设置

现在,Kafka Connect 使用多个内部主题 来存储连接器配置、偏移量和状态信息。这些内部主题必须compacted topics这样(至少)每个连接器的最后配置、偏移量和状态始终可用。由于 Kafka Connect 从不使用旧的配置、偏移量和状态,因此代理将它们从内部主题中删除实际上是一件好事。

在 Kafka 0.11.0.0 之前,recommended process是使用正确的主题特定设置手动创建这些内部主题。您可以依靠代理自动创建它们,但由于多种原因这是有问题的,其中最重要的是三个内部主题应该具有不同数量的分区。

如果这些内部主题未压缩,则配置、偏移量和状态信息将在保​​留期结束后清理并删除。默认情况下,此保留期为 24 小时!这意味着,如果您在部署/更新连接器配置后超过 24 小时重新启动 Kafka Connect,则该连接器的配置可能已被清除,并且看起来连接器配置从未存在过。

因此,如果您没有正确创建这些内部主题,只需使用 topic admin tool将主题的设置更新为 described in the documentation .

顺便说一句,没有正确创建这些内部主题是一个非常普遍的问题,以至于 Kafka Connect 0.11.0.0 将能够使用正确的设置自动创建这些内部主题而不依赖代理自动创建主题。

在 0.11.0 中,对于源连接器写入的主题,您仍然必须依赖手动创建或代理自动创建。这并不理想,所以有一个 proposal更改 Kafka Connect 以自动为源连接器创建主题,同时让源连接器控制设置。希望这一改进使其成为 0.11.1.0,以便 Kafka Connect 更易于使用。

关于apache-kafka - 重新启动集群时连接消费者作业被删除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44699226/

相关文章:

java - 重启数据库后尝试重启 debezium mysql 连接器时出错

java - 如果消息是由生产者产生的,如何从卡夫卡经纪人那里得到确认?

docker - 保留所有kafka主题的数据并注入(inject)不同的实例

apache-kafka - KafKa 分区器类,使用键将消息分配给主题内的分区

java - Kafka-Connect 添加 SQL JAR 文件到类路径

elasticsearch - Confluentinc连接器。一个elasticsearch索引中的多个连接器。以特定的文档类型发布

jdbc - 使用 kafka-connect 从多个主题更新到多个表

docker - 将 KafkaConnect 添加到现有 Ambari 服务

kubernetes - Kubernetes中的max.request.size Kafka

apache-kafka - 使用Kafka Connect时如何转换所有时间戳字段?