java - (卡夫卡)在 Storm 中喷发返回事物列表,无法将此列表传播到 bolt

标签 java apache-kafka apache-storm

我是 Storm 新手,请耐心等待。

我有一项服务,每次我们调用结果列表时都会向 kafka 发送一条消息。

有一个 KafkaSpout 读取每条消息,并且该消息包含前面提到的列表。

这只是一个 JSon,我可以打开它。现在,问题来了:

我在与 jackson 的方案中执行此转换操作,但该方案基本上可以返回一个 Values 对象,该对象不是对象列表,而是基本上一个字段值对列表。

另一件事可能是获取 Bolt 这个 Values (它只是一个扩展的 ArrayList)对象,并将其解包为单个元素,并将它们中的每个元素发送到下一个 Bolt。这是一个解决方案吗?我可以通过一次 Bolt 调用发出多个对象吗?

有更聪明的解决方案吗?

最佳答案

是的,您可以从单个 Bolt 发出多个元组。查看位于 here 的示例 bolt 中的执行方法

public void execute(Tuple tuple) {
  String sentence = tuple.getString(0);
  for(String word: sentence.split(" ")) {
    _collector.emit(tuple, new Values(word));  //emits multiple tuples
  }
  _collector.ack(tuple);
}

如您所见,for 循环可以同时发出多个元组。这样做会创建一个更大的消息树。这可能会导致问题,具体取决于您的可靠性保证和数据大小。

根据我的经验,很难/不可能操纵 KafkaSpout 内的数据。关于您的设置的一些注意事项。

  • 我要做的第一件事是更改服务发送到 Kafka 的内容。您可以将其分成单独的元素而不是一件大元素发送吗?如果您无法改变这一点。
  • 像您提到的那样设置多个 Bolt,Spout => UnwrapBolt => ProcessBolt,其中 UnwrapBolt 获取您的一个数据源并将它们作为单独的元组发出,然后 ProcessBolt 获取每个单独的元组并根据需要处理它们。

关于java - (卡夫卡)在 Storm 中喷发返回事物列表,无法将此列表传播到 bolt ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34040095/

相关文章:

java - method.invoke在java中创建一个新线程吗?

docker - 卡夫卡和 Docker : Pushing a Kafka Messages to the another's Dockerized Consumer

apache-kafka - Flink 检查点不断失败(等待 InitProducerId 时超时)

apache-storm - Apache Kafka 与 Apache Storm

java - 用JAVA扫描一系列文件夹

java - 字符串索引超出范围 : -1 error

java - Bolt 的 Apache Storm Junit 测试用例

java - 在 trident 中实现事务拓扑的问题

java - 使用 Java/selenium webdriver 查找 Web 元素

apache-kafka - Kafka、重新分区、局部性和排序