java - Spring Cloud 流卡夫卡流DLQ

标签 java spring apache-kafka stream cloud

我正在将 Apache Kafka 2.7.0 与 Spring Cloud Stream Kafka Streams 结合使用。

在我的 Spring Cloud Stream (Kafka Streams) 应用程序中,我已将 application.yml 配置为在输入主题中的消息出现反序列化错误时使用 sendToDlq 机制:

spring:
  cloud:
    stream:
      function:
        definition: processor      
      bindings:         
        processor-in-0:
          destination: input-topic
          consumer:
            dlqName: input-topic-dlq
        processor-out-0:
          destination: output-topic       
      kafka:
        streams:
            binder:
              deserialization-exception-handler: sendToDlq
            configuration:
              metrics.recording.level: DEBUG
            brokers:
              - localhost:9092

我启动我的应用程序,但没有看到此主题存在。文档指出,如果不存在,将创建 DLQ 主题。

如果我尝试从 DLQ 主题进行消费,则会收到如下错误:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic-dlq --property print.value=true --property print.key=true --from-beginning
[2021-03-19 10:17:09,936] WARN [Consumer clientId=consumer-console-consumer-85295-1, groupId=console-consumer-85295] Error while fetching metadata with correlation id 2 : {input-topic-dlq=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

此时,当我查询 Zookeeper ls/brokers/topics 时,我会看到已创建的主题。

现在,我尝试将非 JSON 消息发布到输入主题(我的默认反序列化器是 JSON)。

但是我在创建的 input-topic-dlq 主题中看不到任何消息。

奇怪的是我可以在默认的“error.input-topic-dlq.appId”主题中看到消息。

我在这里做错了什么吗?

最佳答案

我设法弄清楚了。 Spring Cloud Stream Kafka Streams Binder 的当前文档中似乎存在拼写错误。

绑定(bind)的目标应该在 spring.cloud.streams.bindings 级别,就像您已经拥有的一样,但特定于实现的消费者属性应该在 spring.cloud.streams.kafka.streams 上.绑定(bind)级别。

所以你的配置应该如下所示:

spring:
  cloud:
    stream:
      function:
        definition: processor      
      bindings:         
        processor-in-0:
          destination: input-topic
        processor-out-0:
          destination: output-topic       
      kafka:
        streams:
            binder:
              deserialization-exception-handler: sendToDlq
            bindings:
              processor-in-0:
                consumer:
                  dlqName: input-topic-dlq
            configuration:
              metrics.recording.level: DEBUG
            brokers:
              - localhost:9092

关于java - Spring Cloud 流卡夫卡流DLQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66700581/

相关文章:

java - 灰熊队 Jersey : Getting MessageBodyWriter not found for media type=application/json

apache-kafka - kafka 本地状态存储/变更日志中的保留时间

concurrency - 事件溯源 : concurrently creating conflicting events

forms - Spring 表单错误地解码撇号

apache-kafka - 如何在达到特定大小 (128 Mb) 时将 Kafka 消息提交到 HDFS 接收器

java - 使用Python的Avro库推送数据时Kafka AVRO反序列化错误

java - 存储和搜索大量键、值的最佳方式

java - thymeleaf 和 session

java - Spring-API 设计子 Controller 中缺少parentid

android - 将 Spring 依赖注入(inject)代码移至 Android