java - Storm KafkaSpout 失败元组重复

标签 java apache-kafka apache-zookeeper apache-storm

我正在使用storm-kafka-1.1.1-plus和storm 1.1.1。并配置使用BaseRichBolt、一个KafkaSpout和两个 bolt bolt-A、Bolt-B,一旦bolt-B确认,元组就会锚定在bolt-A中,它将被视为成功处理的元组并且它将被 promise 。但是,问题是由于某种原因,一些失败的消息在 KafkaSpout 中重复

例如

KafkaSpout 在处理时发出了 1000 个元组,由于某种原因,近 20 个元组失败了(在 Bolt-B 处)。这 20 个元组连续重播,在某个时刻,worker 被杀死,supervisor 重新启动了worker,这 20 个元组再次重播,这次它成功处理,但处理了多次(重复)。

enter image description here

但是,我希望这些元组只能处理一次(成功)。我已将 topology.enable.message.timeouts 设置为 false。我的另一个问题是Storm 在哪里存储那些失败的 Kafka 偏移详细信息。我在动物园管理员上没有找到它,它只有以下详细信息。

{"topology":{"id":"test_Topology-12-1508938595","name":"test_Topology"},"offset":505,"partition":2,"broker":{"host":“127.0.0.1”,“端口”:9092},“主题”:“test_topic_1”}

最佳答案

禁用消息超时可能会导致消息丢失,如果您需要处理所有消息,您可能需要重新考虑禁用它。

启用确认时,Storm 提供至少一次处理保证。您可能想看看是否可以使您的 bolt 幂等,这样重播就不会给您带来问题。或者您可以查看 https://storm.apache.org/releases/1.1.1/Trident-tutorial.html ,它提供一次性状态更新。

编辑: 您可能需要重新考虑您的问题。据我所知,没有一个流处理系统能够提供您想要的一次性处理。

Trident 提供的精确一次语义是,Trident 将帮助您使状态更新幂等,因此从数据存储的角度来看,消息“看起来”只被处理一次。处理仍然是至少一次。请参阅 https://storm.apache.org/releases/2.0.0-SNAPSHOT/Trident-state.html 处的“事务性 spouts”部分(可能还有页面的其余部分)。以获得关于这将如何运作的直觉。基本思想是在数据存储中存储有关哪些消息已被写入的信息,这样如果它们被重复,状态更新代码可以忽略它们。

您可能还想阅读https://streaml.io/blog/exactly-once 。我想说的是,Flink 实现了类似于那里描述的分布式快照算法的东西,这是在至少一次系统中模拟精确一次的不同方式。

关于java - Storm KafkaSpout 失败元组重复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46946895/

相关文章:

java - 从注释处理器应用 CGLib 代理

java - Camel 卡夫卡路线不熬夜

apache-kafka - 汇合 4.1.0 -> KSQL : STREAM-TABLE join -> table data null

docker - 远程访问运行在 kubernetes 中的 Kafka

apache-kafka - 使用 Avro 转换器运行 Kafka Connect : ConfigException: "Missing Schema registry url"

ssl - Kafka设置SSL是否影响Zookeeper通信

java 。根据 WSDL 验证 SOAP 消息

java - JSP 中的样式表为空

linux - 运行几天后的 Kafka LeaderNotAvailableException

kafka Web 控制台的 mysql 配置