java - Kafka 对 Produce with spring-integration 的认可

标签 java spring apache-kafka spring-integration

使用 Spring Integration Kafka (2.1),我能够成功将消息发送到 Kafka 中的主题。

原生 Kafka 客户端 API 提供了成功发送时回调的选项。我怎样才能用 Spring-integration-Kafka 达到同样的效果。下面是我的配置和代码供您引用。

XML配置

<int:publish-subscribe-channel id="inputToKafka" />

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                        auto-startup="true"
                                        channel="inputToKafka"
                                        kafka-template="template"
                                        topic="test"
                                        sync="true">
    </int-kafka:outbound-channel-adapter>

    <bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="localhost:9092" />
                        <!--<entry key="retries" value="0" />
                        <entry key="batch.size" value="16384" />
                        <entry key="linger.ms" value="0" />
                        <entry key="buffer.memory" value="33554432" /> -->
                        <entry key="key.serializer"
                               value="org.apache.kafka.common.serialization.StringSerializer" />
                        <entry key="value.serializer"
                               value="org.apache.kafka.common.serialization.StringSerializer" />
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
    </bean>

发送消息的Java代码

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel channel;

channel.send(MessageBuilder.withPayload("Test Message").build());

最佳答案

目前没有“消息传递”风格的回调,但您可以使用 KafkaTemplate 注册一个 ProducerListener;请参阅KafkaTemplate .

关于java - Kafka 对 Produce with spring-integration 的认可,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41139395/

相关文章:

java - 强制 hibernate 读取数据库并且不返回缓存的实体

java - 由于明显的类路径冲突而导致 jersey WadlAutoDiscoverable 转换错误?

node.js - 我可以限制kafka Node 消费者的消费吗?

java - 如何使用 Java API 添加 SCRAM-SHA-512 kafka 配置?

java - 检查我的窗口监听器是否正常工作

java - 为什么 Android 应用程序在 runtimeException 后自动重启?

Java 数组搜索和排序未按预期工作

java - java代码的重构

mysql - 在 JBoss 7.1.1 中为 MySQL 集群创建 XA 数据源 : Connection Read Only Error

java - 如何在运行时更新 Spring Boot 应用程序的配置而不重新加载整个 ApplicationContext