java - 如何将此 spring-integration 配置从 XML 转换为 Java?

标签 java spring spring-integration

这个特定的部分在应用程序中实现而不是在 XML 中更有意义,因为它是整个集群中的常量,而不是本地化到单个作业。

通过剖析 XSD,在我看来,int-kafka:outbound-channel-adapter 的 xml 构造了一个 KafkaProducerMessageHandler。

没有可见的方式来设置 channel 、主题或大多数其他属性。

请注意潜在的反对者 - (咆哮)我已经使用 RTFM 一周了,比开始时更加困惑。我对语言的选择已经从形容词发展到副词,并且我开始从其他语言借用单词。答案或许就在里面。但即使是这样,也不是凡人能找到的。 (咆哮)

XML 配置:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="kafkaTemplate"
                                    auto-startup="false"
                                    channel="outbound-staging"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

如果是这样,那么我希望 java 配置看起来像这样:

@Bean
public KafkaProducerMessageHandler kafkaOutboundChannelAdapter () {
    KafkaProducerMessageHandler result = new KafkaProducerMessageHandler(kafkaTemplate());

    result.set????? ();    // WTH?? No methods for most of the attributes?!!!

    return result;
}

编辑:有关正在解决的高级问题的其他信息

作为一个更大项目的一部分,我正在尝试实现 https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-partitioning 中的教科书示例,使用 Kafka 支持而不是 JMS 支持。

我相信最终的集成流程应该是这样的:

partitionHandler -> messagesTemplate -> 出站请求 (DirectChannel) -> 出站暂存 (KafkaProducerMessageHandler) -> kafka

kafka ->executionContainer(KafkaMessageListenerContainer)->inboundKafkaRequests(KafkaMessageDrivenChannelAdapter)->入站请求(DirectChannel)->serviceActivator(StepExecutionRequestHandler)

serviceActivator (StepExecutionRequestHandler) -> 回复暂存 (KafkaProducerMessageHandler) -> kafka

kafka->replyContainer(KafkaMessageListenerContainer)->inboundKafkaReplies(KafkaMessageDrivenChannelAdapter)->inbound-replies(DirectChannel)->partitionhandler

最佳答案

不确定你的意思是它们被错过了,但这是我在 KafkaProducerMessageHandler 的源代码中看到的:

public void setTopicExpression(Expression topicExpression) {
    this.topicExpression = topicExpression;
}

public void setMessageKeyExpression(Expression messageKeyExpression) {
    this.messageKeyExpression = messageKeyExpression;
}

public void setPartitionIdExpression(Expression partitionIdExpression) {
    this.partitionIdExpression = partitionIdExpression;
}

/**
 * Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
 * The resulting value should be a {@link Long} type representing epoch time in milliseconds.
 * @param timestampExpression the {@link Expression} for timestamp to wait for result
 * fo send operation.
 * @since 2.3
 */
public void setTimestampExpression(Expression timestampExpression) {
    this.timestampExpression = timestampExpression;
}

等等。

您还可以访问父类(super class) setter ,例如用于 XML 变体的 setSync()

input-channel 不是 MessageHandler 的职责。它会转到 Endpoint,并且可以通过 @ServiceActivator@Bean 一起进行配置。

请参阅核心 Spring 集成引用手册中的更多信息:https://docs.spring.io/spring-integration/reference/html/#annotations_on_beans

开头还有很重要的一章:https://docs.spring.io/spring-integration/reference/html/#programming-tips

此外,最好考虑使用 Java DSL,而不是直接使用 MessageHandler:

             Kafka
                .outboundChannelAdapter(producerFactory)
                .sync(true)
                .messageKey(m -> m
                        .getHeaders()
                        .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .headerMapper(mapper())
                .partitionId(m -> 0)
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic))
                .get();

在提到的 Spring Integration 文档中查看有关 Java DSL 的更多信息:https://docs.spring.io/spring-integration/reference/html/#java-dsl

关于java - 如何将此 spring-integration 配置从 XML 转换为 Java?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55385449/

相关文章:

java - RequestBody JSON 格式有单引号

java - spring jsp <%%> 在页面上可见

spring-integration - JMS 出站网关请求目的地 - 成功后处理

Spring SFTP 出站适配器 - 确定文件何时发送

java - 是否每种数据类型都继承自 Object?

java - 我必须如何设置自己的 hibernate 连接?

java - 如何在 Spring `@Value` 注释中插入类常量?

java - 从数据库中获取数据并以pojo对象的形式返回

java - Java 流式 XML 解析器能否区分空元素和自闭空元素?

java - Soap 消息调用 Web 服务时的 NullPointer