我有以下 Kafka 出站 channel 适配器的 XML 配置:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
auto-startup="true"
channel="activityOutputChannel">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
这很好用。我试图在 Java DSL 中复制它,但我不能走得太远。到目前为止,我只有这个:
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(producerMetadata, brokerAddress)
.get());
我不知道如何使用 DSL 添加 taskExecutor
和 poller
。
对于如何将这些整合到我的整体 IntegrationFlow
中的任何见解,我们表示赞赏。
最佳答案
Spring Integration 组件(例如 <int-kafka:outbound-channel-adapter>
)由两个 bean 组成:AbstractEndpoint
接受来自 input-channel
的消息和 MessageHandler
处理消息。
所以,Kafka.outboundChannelAdapter()
关于 MessageHandler
.任何其他特定于端点的属性取决于第二个 Consumer<GenericEndpointSpec<H>> endpointConfigurer
.handle()
的参数EIP 方法:
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(producerMetadata, brokerAddress),
e -> e.id("kafkaOutboundChannelAdapter")
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS)
.receiveTimeout(0)
.taskExecutor(this.taskExecutor)));
参见 Reference Manual获取更多信息。
关于在 XML 中配置的 Spring Integration Kafka 端点的 Java DSL 等价物,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34441231/