apache-storm - 为什么我不应该在 Spout.nextTuple() 中循环或阻塞

标签 apache-storm

我看到很多代码片段,其中在 Spout.nextTuple() 中使用了循环。 (例如读取整个文件并为每一行发出一个元组):

public void nextTuple() {
    // do other stuff here

    // reader might be BufferedReader that is initialized in open()
    String str;
    while((str = reader.readLine()) != null) {
        _collector.emit(new Values(str));
    }

    // do some more stuff here
}

这段代码似乎很简单,但是,有人告诉我应该 不循环 nextTuple() .问题是为什么?

最佳答案

当一个 Spout 被执行时,它会在单个线程中运行。该线程“永远”循环并具有多种职责:

  • 调用 Spout.nextTuple()
  • 检索“确认”并处理它们
  • 检索“失败”并处理它们
  • 超时元组

  • 要做到这一点,至关重要的是,您不要“永远”(即循环或阻塞)在 nextTuple() 中。但是在向系统发出元组后返回(或者如果没有元组可以发出则返回,但 不会阻塞 )。否则,Spout 无法正常工作。 nextTuple()将被 Storm 循环调用。因此,在 ack/fail 消息被处理等之后,下一次调用 nextTuple()发生很快。

    因此,在对 nextTuple() 的单个调用中发出多个元组也被认为是不好的做法。 .只要代码还在nextTuple() ,spout 线程不能(例如)对传入的 ack 使用react。这可能会导致不必要的超时,因为无法及时处理 ack。

    最佳实践是为每次调用 nextTuple() 发出一个元组。 .如果没有可用的元组可以发出,您应该返回(不发出)而不是等到元组可用。

    关于apache-storm - 为什么我不应该在 Spout.nextTuple() 中循环或阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32547935/

    相关文章:

    apache-spark - 是否建议使用 Kafka 作为事实来源?

    java - Apache Storm : InvalidTopologyException(msg:Component: [x] subscribes from non-existent component [y]

    java - Storm-Kafka多个spout,如何分担负载?

    hadoop - Storm 创造拓扑

    java - 如何在 shellbolt 中调用元组上的脚本

    java - 在Linux命令行中运行java文件(Storm helloWorld示例),包括库

    java - Apache Storm - LocalCluster 停止日志记录,但 java 进程仍在运行

    apache-kafka - java.lang.ClassNotFoundException : kafka. api.OffsetRequest

    java - 将拓扑提交到远程集群时,不会调用 spout 的 open() 方法

    java - 在本地模式下运行 Apache Storm 连接错误