这个特定的部分在应用程序中实现而不是在 XML 中更有意义,因为它是整个集群中的常量,而不是本地化到单个作业。
通过剖析 XSD,在我看来,int-kafka:outbound-channel-adapter
的 xml 构造了一个 KafkaProducerMessageHandler。
没有可见的方式来设置 channel 、主题或大多数其他属性。
请注意潜在的反对者 - (咆哮)我已经使用 RTFM 一周了,比开始时更加困惑。我对语言的选择已经从形容词发展到副词,并且我开始从其他语言借用单词。答案或许就在里面。但即使是这样,也不是凡人能找到的。 (咆哮)
XML 配置:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="kafkaTemplate"
auto-startup="false"
channel="outbound-staging"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
如果是这样,那么我希望 java 配置看起来像这样:
@Bean
public KafkaProducerMessageHandler kafkaOutboundChannelAdapter () {
KafkaProducerMessageHandler result = new KafkaProducerMessageHandler(kafkaTemplate());
result.set????? (); // WTH?? No methods for most of the attributes?!!!
return result;
}
编辑:有关正在解决的高级问题的其他信息
作为一个更大项目的一部分,我正在尝试实现 https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-partitioning 中的教科书示例,使用 Kafka 支持而不是 JMS 支持。
我相信最终的集成流程应该是这样的:
partitionHandler -> messagesTemplate -> 出站请求 (DirectChannel) -> 出站暂存 (KafkaProducerMessageHandler) -> kafka
kafka ->executionContainer(KafkaMessageListenerContainer)->inboundKafkaRequests(KafkaMessageDrivenChannelAdapter)->入站请求(DirectChannel)->serviceActivator(StepExecutionRequestHandler)
serviceActivator (StepExecutionRequestHandler) -> 回复暂存 (KafkaProducerMessageHandler) -> kafka
kafka->replyContainer(KafkaMessageListenerContainer)->inboundKafkaReplies(KafkaMessageDrivenChannelAdapter)->inbound-replies(DirectChannel)->partitionhandler
最佳答案
不确定你的意思是它们被错过了,但这是我在 KafkaProducerMessageHandler
的源代码中看到的:
public void setTopicExpression(Expression topicExpression) {
this.topicExpression = topicExpression;
}
public void setMessageKeyExpression(Expression messageKeyExpression) {
this.messageKeyExpression = messageKeyExpression;
}
public void setPartitionIdExpression(Expression partitionIdExpression) {
this.partitionIdExpression = partitionIdExpression;
}
/**
* Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
* The resulting value should be a {@link Long} type representing epoch time in milliseconds.
* @param timestampExpression the {@link Expression} for timestamp to wait for result
* fo send operation.
* @since 2.3
*/
public void setTimestampExpression(Expression timestampExpression) {
this.timestampExpression = timestampExpression;
}
等等。
您还可以访问父类(super class) setter ,例如用于 XML 变体的 setSync()
。
input-channel
不是 MessageHandler
的职责。它会转到 Endpoint
,并且可以通过 @ServiceActivator
与 @Bean
一起进行配置。
请参阅核心 Spring 集成引用手册中的更多信息:https://docs.spring.io/spring-integration/reference/html/#annotations_on_beans
开头还有很重要的一章:https://docs.spring.io/spring-integration/reference/html/#programming-tips
此外,最好考虑使用 Java DSL,而不是直接使用 MessageHandler
:
Kafka
.outboundChannelAdapter(producerFactory)
.sync(true)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 0)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic))
.get();
在提到的 Spring Integration 文档中查看有关 Java DSL 的更多信息:https://docs.spring.io/spring-integration/reference/html/#java-dsl
关于java - 如何将此 spring-integration 配置从 XML 转换为 Java?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55385449/