java - Spring-Integration XML 到 Java

标签 java spring spring-integration

如何将此代码转换为 Java 配置?

<int-kafka:outbound-channel-adapter
        id="mainOutboundChannelAdapter"
        kafka-producer-context-ref="kafkaProducerContext"
        channel="mainOutboundTopicChanel">
</int-kafka:outbound-channel-adapter>

最佳答案

是的,你可以。请找到最新的 Spring Integration Java DSL :

您的情况可能如下所示:

@Bean
public IntegrationFlow sendToKafkaFlow(String serverAddress) {
    return f -> f.<String>split(p -> FastList.newWithNValues(100, () -> p), null)
            .handle(kafkaMessageHandler(serverAddress));
}

private KafkaProducerMessageHandlerSpec kafkaMessageHandler(String serverAddress) {
    return Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000"))
            .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .addProducer(TEST_TOPIC, serverAddress, this::producer);
}

private void producer(KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) {
    metadata.async(true)
            .batchNumMessages(10)
            .valueClassType(String.class)
            .<String>valueEncoder(String::getBytes)
            .keyEncoder(new IntEncoder(null));
}

更新 没有 Lambda,但仍然是 Spring Integration:

@Bean
@ServiceActivator(inputChannel = "mainOutboundTopicChanel")
public MessageHandler kafkaProducer() {
    return new KafkaProducerMessageHandler<String, String>(kafkaProducerContext());
}

@Bean
public KafkaProducerContext<String, String> kafkaProducerContext() {
    KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
    ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(TOPIC);
    producerMetadata.setValueClassType(String.class);
    producerMetadata.setKeyClassType(String.class);
    Encoder<String> encoder = new StringEncoder<String>();
    producerMetadata.setValueEncoder(encoder);
    producerMetadata.setKeyEncoder(encoder);
    producerMetadata.setAsync(true);
    Properties props = new Properties();
    props.put("queue.buffering.max.ms", "15000");
    ProducerFactoryBean<String, String> producer =
            new ProducerFactoryBean<String, String>(producerMetadata, kafkaRule.getBrokersAsString(), props);
    ProducerConfiguration<String, String> config =
            new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
        kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(TOPIC, config));
    return kafkaProducerContext;
}

并且不要忘记添加 @EnableIntegration 以及您的 @Configuration

对于 future :Spring 中的任何 XML 标记都由一些 NamespaceHandler 解析,例如在这种情况下,它是 KafkaNamespaceHandler。阅读它的源代码我们可以找到这些行:

registerBeanDefinitionParser("outbound-channel-adapter", new KafkaOutboundChannelAdapterParser());
        registerBeanDefinitionParser("producer-context", new KafkaProducerContextParser());

当我们转到 KafkaOutboundChannelAdapterParser 并看到它填充了一个 BeanDefinition 时:

final BeanDefinitionBuilder kafkaProducerMessageHandlerBuilder =
                                BeanDefinitionBuilder.genericBeanDefinition(KafkaProducerMessageHandler.class);

按源码依次类推。

更新 2

消费者部分:

@Bean
@InboundChannelAdapter(value = "fromKafkaChannel",
    poller = @Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
public MessageSource<Map<String, Map<Integer, List<Object>>>> kafkaMessageSource() {
    return new KafkaHighLevelConsumerMessageSource<String, String>();
}

@Bean
public KafkaConsumerContext<String, String> kafkaConsumerContext() {
    KafkaConsumerContext<String, String> kafkaConsumerContext = new KafkaConsumerContext<String, String>();
    .....
    kafkaConsumerContext.setConsumerConfigurations(map);
    return kafkaConsumerContext;
}

关于java - Spring-Integration XML 到 Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30180057/

相关文章:

java - 如何使Spring Integration AMQP队列事务解耦?

Java 绘图库

java - 必须启动 Instrumentation LoadTimeWeaver/openjdk 11

java - Spring Boot 测试数据库初始化运行两次

java - Spring FTP 入站 channel 适配器 - 只需每 5 分钟登录一次

spring - 在 Spring JMS 集成中配置基于时间间隔的 cron

javascript - MongoDB : Adding Days field to Date Type Field in DB and then comparing with the current date

java - Eclipse:VM 初始化期间发生错误

java - ObjectInputStream.readObject() 断开连接时不抛出异常

java - Spring Boot JPA Lazy Fetch 不起作用