apache-kafka - 如何将消息从kafka-sink路由到多个主题

标签 apache-kafka spring-integration spring-xd kafka-producer-api

我有一个带有 http-outbound-gateway 的 spring-xd http-processor 模块,它有一个 errorChannel 和 outputChannel。任何带有 HTTP 200 的消息都会到达 outputChannel,其余消息则落在 failureChannel 中。

现在,http-processor 模块通过带有 TopicX 的 kafka-outbound-adapter 连接到 Kafka-Sink。 TopicX 仅接收 HTTP 200 消息以进行进一步处理。现在,我们需要将 failureChannel 中的消息路由到 TopicY。

如何在 kafka-sink 中向多个 kafka 主题发送消息。我在消息 header 中有 httpStatusCode。我的项目中使用的Kafka版本是0.8.2,java版本是1.7

<!-- http-processor-config -->
<int-http:outbound-gateway
        request-channel="input"
        url-expression="'myUrlLink'"
        http-method="POST"
        expected-response-type="java.lang.String"
        charset="UTF-8"
        reply-timeout="10"
        reply-channel="output">

        <int-http:request-handler-advice-chain>
                    <bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
                        <property name="recoveryCallback">
                            <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
                                <constructor-arg ref="errorChannel" />
                            </bean>
                        </property>
                        <property name="retryTemplate" ref="retryTemplate" />
                    </bean>
        </int-http:request-handler-advice-chain>

</int-http:outbound-gateway>


<!-- Handle failed messages and route to failureChannel for specific http codes-->
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/>

在 Kafka Sink 上,我有以下生产者上下文:

    <int-kafka:producer-context id="kafkaProducerContext">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="localhost:9092"
                                          topic="${topicX}"
                                          key-class-type="java.lang.String"
                                          key-serializer="serializer"
                                          value-class-type="[B"
                                          value-serializer="valueSerializer"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

最佳答案

确实,它现在不受支持,也不会受支持。 Spring XD 今年已经停产。鼓励每个人迁移到 Spring Cloud Data Flow .

对于您的用例,您可以编辑 Kafka Sink 模块配置。再添加一张 <int-kafka:outbound-channel-adapter>这是另一个话题。要决定将传入消息发送到哪个主题,您可以添加 <router>到这个配置。

或者只是考虑使用Router Sink 。每种消息类型以及每个主题都有两个单独的流。

关于apache-kafka - 如何将消息从kafka-sink路由到多个主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43074278/

相关文章:

java - Spring Integration 解码变压器 Jaxb2Marshaller 性能问题

spring - Spring-xd Stream正在将空文件写入我的HDFS

java - 如何使用 Spring Batch 查看集群中的所有作业?

ssis - Kafka 与 StreamSets

apache-kafka - 在序列/类图中将消费/生产事件表示为从微服务到消息队列[Kafka]的UML组件

apache-kafka - 使用 SCS 删除消费消息的 kafka 日志

java - KafkaConsumer 在轮询时进入无限期等待状态

elasticsearch - 排队机制和 Elasticsearch 1.4.0

spring - Spring集成方法发送WebSocket消息时不广播

hadoop - 如何将数据从静态网站导入HDFS?