java - 具有多种消息类型的 Kafka 和主题 (Avro) : Consumer(spring cloud spring fails due to the lack of class on classpath

标签 java apache-kafka spring-cloud-stream

主题包含两种类型的消息:PaymentStarted 和 PaymentCompleted。 有 2 个独立的微服务与消费者。 所以:Microservice_1,有consumer1,它应该获取PaymentStarted类型; Microservice_2,有consumer2,它应该获取PaymentCompleted类型。 此外,Microservice_1 在其 jvm 类路径上仅包含 PaymentStarted,并且 Microservice_2 在其 jvm 类路径上仅包含 PaymentCompleted。

每个服务用于处理不同类型的消息。 我正在使用 spring-cloud-stream,因此在我的 Microservice_1 中,消费者 1 具有:

@StreamListener(target = PayBindings.EVENTS, condition = "headers['eventType']=='com.company.domain.PaymentStarted'")
    void onPaymentEvents(Message<PaymentStarted> message){
    }

类似地,Microservice_2 消费者 2 具有:

@StreamListener(target = PayBindings.EVENTS, condition = "headers['eventType']=='com.company.domain.PaymentCompleted'")
    void onPaymentEvents(Message<PaymentCompleted> message){
    }

文档说( https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.M1/spring-cloud-stream.html#_using_streamlistener_annotation )过滤在序列化之前进行。

但在我的情况下,在条件过滤掉它们之前,默认反序列化器(在我的情况下是 io.confluence.kafka.serializers.KafkaAvroDeserializer)由于 microservice_1 的类路径上缺少 PaymentCompleted 而失败。与 microservice_2 和 PaymentStarted 类似。 我不想混合域并将 PaymentStarted 和 PaymentCompleted pojo 保留在 Microservice_1 和 Microservice_2 中。

我尝试用我的自定义反序列化器解决这个问题,看起来可以处理异常,但这确实很棘手。

我也很困惑,我上面提到的文档说过滤在反序列化之前进行。

感谢您的想法/评论。

最佳答案

您确实应该为每种类型使用不同的主题。

但是,请参阅Spring for Apache Kafka's ErrorHandlingDeserializer2 .

这将捕获异常,并且监听器容器会将错误直接路由到容器的错误处理程序。

I am also confused that the doc I mentioned here above says that the filtering out goes before deserializing.

当您使用 native 反序列化时,这并不适用,它在堆栈中的位置要低得多。

关于java - 具有多种消息类型的 Kafka 和主题 (Avro) : Consumer(spring cloud spring fails due to the lack of class on classpath,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58878302/

相关文章:

performance - 1MB 消息的 Redis vs Kafka vs RabbitMQ

Java - 如何在 SpringCloud 和 RabbitMQ 中从单个发布者处进行多个订阅

java - 以编程方式禁用 Android 上的屏幕开/关磁传感器

java - 如何使 Java 语法正确?

java - 无法使用 JavaMail API 编译 JSP 文件

apache-kafka - RocksDb sst 文件的 GUI 查看器

scala - 在 Spark 中以结构化流模式获取 Offset 的消息正在重置

cloud-foundry - Spring Cloud Dataflow 有什么好处?

java - 我可以使用 Spring Cloud Stream 绑定(bind)多个消费者组吗?

java - 如何使用数组在 Java 的一行中创建多个字符串和整数值?