java - 为 Kafka 实现 Spring Integration InboundChannelAdapter

标签 java spring-integration apache-kafka

我正在尝试在 Spring 集成中实现自定义入站 channel 适配器以使用来自 apache kafka 的消息。基于 Spring 集成示例,我发现我需要创建一个实现 MessageSource 接口(interface)的类并实现 receive() 方法,该方法将从 kafka 返回消费的消息。但基于consumer example in kafka ,KafkaStream 中的消息迭代器由 BlockingQueue 支持。因此,如果队列中没有消息,则线程将被阻塞。

那么实现 receive() 方法的最佳方法是什么,因为该方法可能会阻塞,直到有东西可以消耗为止。?

从更一般的意义上来说,我们如何为流消息源实现一个自定义入站 channel ,该 channel 会阻塞,直到有东西可供使用......?

最佳答案

receive() 方法可以阻塞(只要底层操作正确响应中断的线程),并且从入站 channel 适配器的角度来看,根据底层源的期望,最好使用固定延迟触发器。例如,当提供非常小的延迟值时,“长轮询”可以模拟事件驱动的行为。

我们的 JMS 轮询 MessageSource 实现中也有类似的情况。在那里,底层行为由 JmsTemplate 的 receive() 方法之一处理。 JmsTemplate 本身允许配置超时值。这意味着,举例来说,您可以选择最多阻塞 5 秒,但随后在每个阻塞接收调用之间有一个非常短的延迟触发。或者,您可以指定无限期的接收超时。该决定最终取决于底层资源、消息吞吐量等的期望。

此外,我想让您知道我们正在自己探索 Kafka 适配器。也许您想在 spring-integration-extensions 存储库中对此进行协作?

问候, 标记

关于java - 为 Kafka 实现 Spring Integration InboundChannelAdapter,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14637140/

相关文章:

java - 入站和出站网关 AMQP 注释

java - jndi 查找 DefaultFtpSessionFactory

json - 使用 ClickHouse 使用来自 Kafka 的嵌套 JSON 消息

java - Spring cloud stream kafka binder 是否有一个很好的例子来使用通用的 json 消息

java - 关于聚合消息

java - Elasticsearch 查询插件

java - LibreOffice 邮件合并与 Java

具有相互证书的 WCF 服务互操作的 Java 客户端 - 无法解析用于验证签名的 KeyInfo

java - Kafka 事务失败但仍然提交偏移量

java - 日志记录有时有效,但有时无效