在 XML 中配置的 Spring Integration Kafka 端点的 Java DSL 等价物

标签 java spring spring-integration apache-kafka

我有以下 Kafka 出站 channel 适配器的 XML 配置:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="true"
                                    channel="activityOutputChannel">
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>

</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

这很好用。我试图在 Java DSL 中复制它,但我不能走得太远。到目前为止,我只有这个:

.handle(Kafka.outboundChannelAdapter(kafkaConfig)
        .addProducer(producerMetadata, brokerAddress)
        .get());

我不知道如何使用 DSL 添加 taskExecutorpoller

对于如何将这些整合到我的整体 IntegrationFlow 中的任何见解,我们表示赞赏。

最佳答案

Spring Integration 组件(例如 <int-kafka:outbound-channel-adapter> )由两个 bean 组成:AbstractEndpoint接受来自 input-channel 的消息和 MessageHandler处理消息。

所以,Kafka.outboundChannelAdapter()关于 MessageHandler .任何其他特定于端点的属性取决于第二个 Consumer<GenericEndpointSpec<H>> endpointConfigurer .handle() 的参数EIP 方法:

.handle(Kafka.outboundChannelAdapter(kafkaConfig)
    .addProducer(producerMetadata, brokerAddress),
           e -> e.id("kafkaOutboundChannelAdapter")
                 .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS)
                                        .receiveTimeout(0)
                                        .taskExecutor(this.taskExecutor)));

参见 Reference Manual获取更多信息。

关于在 XML 中配置的 Spring Integration Kafka 端点的 Java DSL 等价物,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34441231/

相关文章:

java - 如何在 Spring 中从 UI 获取对象列表?

spring Scheduler - 每周运行一次

java - 预期类型 : java. lang.Double 实际值 : java. lang.Hibernate 中的整数错误

javascript - Spring MVC 在表单中添加行

java - 在 Spring Boot 的单元测试期间将配置对象注入(inject)到服务 bean

Java ArrayList 比较 - TicTacToe

spring-integration - 实现自定义入站 channel 适配器的最简单方法是什么?

spring-integration - javax.mail.FolderClosedException : * BYE JavaMail Exception: javax.net.ssl.SSLException:IMAP 连接超时(读取失败)

java - 安装了最新的 JDK 1.8.0,但我的 Javac -version 仍然显示旧版本 (Windows 7 - 64)

spring - 基于 JSON 或 Pipe(|) 的 DSL,用于 Spring Integration 中的动态流生成