主题包含两种类型的消息:PaymentStarted 和 PaymentCompleted。 有 2 个独立的微服务与消费者。 所以:Microservice_1,有consumer1,它应该获取PaymentStarted类型; Microservice_2,有consumer2,它应该获取PaymentCompleted类型。 此外,Microservice_1 在其 jvm 类路径上仅包含 PaymentStarted,并且 Microservice_2 在其 jvm 类路径上仅包含 PaymentCompleted。
每个服务用于处理不同类型的消息。 我正在使用 spring-cloud-stream,因此在我的 Microservice_1 中,消费者 1 具有:
@StreamListener(target = PayBindings.EVENTS, condition = "headers['eventType']=='com.company.domain.PaymentStarted'")
void onPaymentEvents(Message<PaymentStarted> message){
}
类似地,Microservice_2 消费者 2 具有:
@StreamListener(target = PayBindings.EVENTS, condition = "headers['eventType']=='com.company.domain.PaymentCompleted'")
void onPaymentEvents(Message<PaymentCompleted> message){
}
文档说( https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.M1/spring-cloud-stream.html#_using_streamlistener_annotation )过滤在序列化之前进行。
但在我的情况下,在条件过滤掉它们之前,默认反序列化器(在我的情况下是 io.confluence.kafka.serializers.KafkaAvroDeserializer)由于 microservice_1 的类路径上缺少 PaymentCompleted 而失败。与 microservice_2 和 PaymentStarted 类似。 我不想混合域并将 PaymentStarted 和 PaymentCompleted pojo 保留在 Microservice_1 和 Microservice_2 中。
我尝试用我的自定义反序列化器解决这个问题,看起来可以处理异常,但这确实很棘手。
我也很困惑,我上面提到的文档说过滤在反序列化之前进行。
感谢您的想法/评论。
最佳答案
您确实应该为每种类型使用不同的主题。
但是,请参阅Spring for Apache Kafka's ErrorHandlingDeserializer2 .
这将捕获异常,并且监听器容器会将错误直接路由到容器的错误处理程序。
I am also confused that the doc I mentioned here above says that the filtering out goes before deserializing.
当您使用 native 反序列化时,这并不适用,它在堆栈中的位置要低得多。
关于java - 具有多种消息类型的 Kafka 和主题 (Avro) : Consumer(spring cloud spring fails due to the lack of class on classpath,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58878302/