java - 在 KafkaBolt 中暴露 Kafka 发布异常

标签 java apache-kafka apache-storm kafka-producer-api

我使用KafkaBolt在 Storm 中向各种 Kafka 主题发布消息。我想在发布逻辑周围放置日志记录和指标,以便我可以围绕发布失败时可能引发的任何异常创建警报。公开这些异常是通过传递到 KafkaProducer.send()Callback 函数来完成的。 ,发布成功或失败后执行。

问题是 KafkaBolt 完全封装了它的 KafkaProducer,因此无法注入(inject)自定义 Callback,所以如果我想看到任何错误我必须在 Storm UI 中查看。我通过为 KafkaBolt 创建包装器来解决这个问题。反过来,该包装器会将传入 KafkaBolt.prepare()OutputCollector 包装在自定义 OutputCollector 中,该自定义 OutputCollector 会覆盖 的行为OutputCollector.reportError()。然后我可以在那里添加我自己的日志记录和指标报告代码,然后让它调用原始方法。

这个解决方案似乎完全足以满足我的需要,但奇怪的是 KafkaBolt 使得以编程方式访问这些异常变得如此困难。我想知道我是否遗漏了一些明显的东西,以及是否有更好的方法来做到这一点。

最佳答案

我认为您没有遗漏任何东西,您可能只是第一个有这种需求的人。必须有人解决这个问题并决定解决它:)

如果您想对 Bolt 进行更改以支持自定义错误处理(例如,通过允许用户按照您的建议提供回调),您可以在 https://issues.apache.org/jira/projects/STORM/issues 提出问题并针对 https://github.com/apache/storm/pulls 进行 PR 。当然,也欢迎您只提出问题,其他人可能会看到它并决定修复它,但自己贡献修复可能会更快。

编辑:您可以在 https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java 找到 bolt 代码

关于java - 在 KafkaBolt 中暴露 Kafka 发布异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51657070/

相关文章:

apache-kafka - 如何更改主题的起始偏移量?

apache-kafka - 没有消息时从Kafka消费者返回

javascript - Angular 4 以及 Node-rdkafka 和 kafka-node 的加载问题

java - 提取引号之间的子字符串,忽略\"

java - 尝试在空对象引用上调用虚拟方法 - Android

streaming - 有状态和无状态流处理

java - 如何以编程方式终止 Apache Storm 拓扑?

java - 线程 "main"java.lang.NoClassDefFoundError : backtype/storm/spout/MultiScheme 中出现异常

java - 如何将一个jTable双击鼠标事件应用于同一包中的不同jFrame

java - 由于浮点精度误差导致的分割平方问题