java - 为什么我的所有 Kafka 消息都在 Storm 中重播?

标签 java apache-kafka apache-storm

我想弄清楚为什么每次我重新启动 Storm 拓扑时我的所有 Kafka 消息都会被重播。

我的理解是,一旦最后一个 Bolt 确认了元组,spout 就应该在 Kafka 上提交消息,因此我不应该在重启后看到它重播。

我的代码是一个简单的 Kafka-spout 和一个 Bolt,它只打印每条消息然后确认它们。

private static KafkaSpout buildKafkaSpout(String topicName) {
    ZkHosts zkHosts = new ZkHosts("localhost:2181");
    SpoutConfig spoutConfig = new SpoutConfig(zkHosts, 
            topicName, 
            "/" + topicName, 
            "mykafkaspout");      /*was:UUID.randomUUID().toString()*/
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    return new KafkaSpout(spoutConfig);
}

public static class PrintBolt extends BaseRichBolt {
    OutputCollector _collector;
    public static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        LOG.error("PrintBolt.0: {}",tuple.getString(0));
        _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("nothing"));
    }
}

public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("kafka", buildKafkaSpout("mytopic"), 1);
    builder.setBolt("print1", new PrintBolt(),1).shuffleGrouping("kafka");
}

除了代码中的设置,我没有提供任何配置设置。

我是否缺少配置设置或我做错了什么?

更新:

澄清一下,在我重新启动管道之前一切正常。以下行为是我在其他(非 Storm )消费者中可以获得的,以及我对 KafkaSpout 的期望

我的期望: Expected behavior

然而,我使用默认设置的实际行为如下。消息处理得很好,直到我停止管道,然后当我重新启动时,我得到所有消息的重播,包括那些我认为我已经确认的消息(A 和 B)

实际发生了什么: EarliestTime, default behavior

根据 configuration options Matthias 提到,我可以将 startOffsetTime 更改为 Latest,但这实际上是管道丢弃消息(消息“C”)时生成的最新消息管道正在重新启动。

LatestTime, messages dropped

我有一个用 NodeJS 编写的消费者(使用 npm kafka-node),它能够向 Kafka 确认消息,当我重新启动 NodeJs 消费者时,它完全符合我的预期( catch 消息“C”时产生的consumer were down and continue from there) -- 那么我如何使用 KafkaSpout 获得相同的行为?

最佳答案

问题出在提交代码中——如果 storm jar 在没有拓扑名称的情况下运行,用于提交拓扑的模板代码将创建一个 LocalCluster 实例,并且本地集群不捕获状态,因此不捕获重放。

所以

$ storm jar myjar.jar storm.myorg.MyTopology topologyname

将在我的 single node development cluster 上启动它, 其中

$ storm jar myjar.jar storm.myorg.MyTopology

将在 LocalCluster 实例上启动它

关于java - 为什么我的所有 Kafka 消息都在 Storm 中重播?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33060800/

相关文章:

java - 如何在另一种方法中使用 findViewById 变量。 Java 安卓

java - Spring 3.0.5 和 Hibernate 3.5.3 - 包引用错误?

java - 如果 kafka 消费者指定的偏移量不存在于 Broker 中,会发生什么情况?

java - 'exactly once' 是否仅适用于流(主题 1 -> 应用程序 -> 主题 2)?

java - 如何限制 Trident DRPC 结果仅包含拓扑的最后一个函数的字段?

java - Java <identifier>预期错误?

hadoop - 使用Spring Boot在Kerberized kafka集群到hadoop集群之间的数据流

hadoop - Storm UI 拓扑不起作用

apache-storm - Storm spout - 如何使用 >1 个线程仅从文本文件中读取所有行一次?

java - 从 GUI java 将对象的属性写入文件时出错