我正在使用storm-kafka-1.1.1-plus和storm 1.1.1。并配置使用BaseRichBolt、一个KafkaSpout和两个 bolt bolt-A、Bolt-B,一旦bolt-B确认,元组就会锚定在bolt-A中,它将被视为成功处理的元组并且它将被 promise 。但是,问题是由于某种原因,一些失败的消息在 KafkaSpout 中重复。
例如
KafkaSpout 在处理时发出了 1000 个元组,由于某种原因,近 20 个元组失败了(在 Bolt-B 处)。这 20 个元组连续重播,在某个时刻,worker 被杀死,supervisor 重新启动了worker,这 20 个元组再次重播,这次它成功处理,但处理了多次(重复)。
但是,我希望这些元组只能处理一次(成功)。我已将 topology.enable.message.timeouts 设置为 false。我的另一个问题是Storm 在哪里存储那些失败的 Kafka 偏移详细信息。我在动物园管理员上没有找到它,它只有以下详细信息。
{"topology":{"id":"test_Topology-12-1508938595","name":"test_Topology"},"offset":505,"partition":2,"broker":{"host":“127.0.0.1”,“端口”:9092},“主题”:“test_topic_1”}
最佳答案
禁用消息超时可能会导致消息丢失,如果您需要处理所有消息,您可能需要重新考虑禁用它。
启用确认时,Storm 提供至少一次处理保证。您可能想看看是否可以使您的 bolt 幂等,这样重播就不会给您带来问题。或者您可以查看 https://storm.apache.org/releases/1.1.1/Trident-tutorial.html ,它提供一次性状态更新。
编辑: 您可能需要重新考虑您的问题。据我所知,没有一个流处理系统能够提供您想要的一次性处理。
Trident 提供的精确一次语义是,Trident 将帮助您使状态更新幂等,因此从数据存储的角度来看,消息“看起来”只被处理一次。处理仍然是至少一次。请参阅 https://storm.apache.org/releases/2.0.0-SNAPSHOT/Trident-state.html 处的“事务性 spouts”部分(可能还有页面的其余部分)。以获得关于这将如何运作的直觉。基本思想是在数据存储中存储有关哪些消息已被写入的信息,这样如果它们被重复,状态更新代码可以忽略它们。
您可能还想阅读https://streaml.io/blog/exactly-once 。我想说的是,Flink 实现了类似于那里描述的分布式快照算法的东西,这是在至少一次系统中模拟精确一次的不同方式。
关于java - Storm KafkaSpout 失败元组重复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46946895/