java - Spring 卡夫卡: different json payload on the same topic

标签 java spring apache-kafka spring-integration spring-kafka

我需要在同一个 Kafka 主题(例如 Foo、Bar、Car ...)上发送不同的 JSON 负载,而不使用父类

基于 spring kafka 文档,我可以在类级别使用 @KafkaListener 并在方法级别指定 @KafkaHandler ( doc )

@KafkaListener(topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(Foo foo) {
        ...
    }

    @KafkaHandler
    public void listen(Bar bar) {
        ...
    }

    @KafkaHandler
    public void listen(Car car) {
        ...
    }
}

但我得到了这个异常(exception):

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = myTopic, partition = 1, offset = 0, CreateTime = 1508859519287, checksum = 3297149058, serialized key size = -1, serialized value size = 13, key = null, value = {"foo":"foo"})
org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92)
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:147)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)

我也尝试过这种方法,但它不起作用:

   @KafkaListener(topics = "myTopic")
   public void receive(ConsumerRecord<String, ?> payload)
   {
      if (payload.value() instanceof Foo)
      {
         //
      }
      else if (payload.value() instanceof Car)
      {
         //
      }
   }

如何配置我的生产者和消费者使用 spring kafka 在同一主题上发送不同的 JSON 负载?

最佳答案

使用 JSON 和多方法监听器时有一个 catch-22;我们需要知道类型才能路由到正确的方法;我们无法从方法签名推断出转换的类型,因为我们还不知道要调用哪个方法。

您需要一个消息转换器,可以通过数据或使用 header (使用 Kafka 11)来确定类型。

然后,在转换器转换为正确的类型之后,我们就可以确定要调用哪个方法。

第二次尝试时,ConsumerRecord 将包含原始 JSON;您将需要使用 ObjectMapper 来进行转换,但同样,您将需要一些关于您想要转换为什么类型的提示(或者使用暴力进行转换,直到成功)。

关于java - Spring 卡夫卡: different json payload on the same topic,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46915365/

相关文章:

java - Spring 应用程序中的 Okta 自动注销

java - 编写java包装器

apache-spark - 如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?

java - 断言失败错误 : <class> has no public constructor

java - 设置外观时序列化 AbstractTableModel 失败

java - CAS:用户以其他用户身份登录

azure - 从浏览器端连接到 Azure IoT/事件中心

java - 如何将 int 值与多个字符串值进行比较。所以它适用于数据库。值(value)转化失败

java - Spring-batch - 事务不正常 - 我如何找出问题所在?

apache-kafka - 卡夫卡流 : Consumer commit frequency