apache-kafka - 无法在具有多个主题分区的 Kafka Streams 中重新平衡错误

标签 apache-kafka apache-kafka-streams

当源主题分区计数 = 1 时工作正常。如果我将分区增加到任何 > 1 的值,我会看到以下错误。适用于 Low level 和 DSL API。任何指针?可能缺少什么?

org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_1] Store in-memory-avg-store's change log (cpu-streamz-in-memory-avg-store-changelog) does not contain partition 1
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier$MemoryStore.init(InMemoryKeyValueStoreSupplier.java:102)
        at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

最佳答案

这是一个操作问题。 Kafka Streams 不允许在其“生命周期”期间更改输入主题分区的数量。

如果您停止正在运行的 Kafka Streams 应用程序,更改输入主题分区的数量,然后重新启动您的应用程序,它将中断(出现上面的错误)。在生产用例中修复此问题很棘手,强烈建议 不是 更改输入主题分区的数量(参见下面的评论)。对于 POC/演示,修复并不难。

为了解决这个问题,您应该使用 Kafka 的应用程序重置工具重置您的应用程序:

  • http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool
  • https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

  • 使用应用程序重置工具的缺点是您会清除整个应用程序状态。因此,为了让您的应用程序进入与以前相同的状态,您需要从头开始重新处理整个输入主题。这当然只有在所有输入数据仍然可用并且应用主题保留时间/大小策略的代理没有删除任何数据的情况下才有可能。

    此外,您应该注意,向输入主题添加分区会更改主题的分区模式(默认为基于哈希的键分区)。因为 Kafka Streams 假设输入主题按 key 正确分区,如果您使用重置工具并重新处理所有数据,您可能会得到错误的结果,因为“旧”数据的分区与"new"数据不同(即添加后写入的数据)新分区)。对于生产用例,您需要从原始主题中读取所有数据并将其写入新主题(分区数量增加)以正确分区数据(当然,此步骤可能会更改具有不同的记录的顺序)键——通常不应该是什么问题——只是想提一下)。之后,您可以将新主题用作 Streams 应用程序的输入主题。

    也可以使用运算符 through("new_topic_with_more_partitions") 在您的 Streams 应用程序中轻松完成此重新分区步骤。直接在阅读原始主题之后,在进行任何实际处理之前。

    但是,一般而言,建议对生产用例的主题进行过度分区,以便以后永远不需要更改分区数。过度分区的开销相当小,并为您节省了以后的很多麻烦。如果您使用 Kafka,这是一个一般性建议——它不仅限于 Streams 用例。

    One more remark:

    Some people might suggest to increase the number of partitions of Kafka Streams internal topics manually. First, this would be a hack and is not recommended for certain reasons.

    1. It might be tricky to figure out what the right number is, as it depends on various factors (as it's a Stream's internal implementation detail).
    2. You also face the problem of breaking the partitioning scheme, as described in the paragraph above. Thus, you application most likely ends up in an inconsistent state.

    In order to avoid inconsistent application state, Streams does not delete any internal topics or changes the number of partitions of internal topics automatically, but fails with the error message you reported. This ensure, that the user is aware of all implications by doing the "cleanup" manually.



    顺便说一句:对于即将到来的卡夫卡0.10.2此错误消息得到改进:https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L100-L103

    关于apache-kafka - 无法在具有多个主题分区的 Kafka Streams 中重新平衡错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42329387/

    相关文章:

    apache-kafka-streams - Kafka Stream 0.10.2.0 状态存储在存储值时获取异常

    java - 在kafka 0.8 API中,创建OffsetRequest时replicaId值应该是多少?

    java - 发送字节数组到storm kafka bolt

    java - Kafka Streams API 中的 ArrayList Serde 问题

    scala - 卡夫卡流0.10.1 "Failed to flush state store"

    apache-kafka-streams - Kafka 流使用 dsl api 中调用的处理器的上下文转发

    apache-kafka-streams - Kafka 流构建物化 View

    apache-kafka - 由于消费者速度慢,Kafka 重新平衡主题中的数据

    java - 如何将我的流值映射到我的对象类

    java - 两个 Kafka 消费者互相造成奇怪的行为