java - org.springframework.messaging.MessageHandlingException : Missing header 'kafka_receivedMessageKey' for method parameter type [class java. lang.Integer]

标签 java apache-kafka kafka-consumer-api spring-kafka

问题 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 中提到了我的集群配置、类详细信息和 jar 版本

我已经启动了 Zookeeper-server、Kafka-server 和 Kafka REST 服务器。接下来,我将在 tomcat 上部署名为 spring-kafka-webhook-service.war 文件的 spring-boot war 文件。

当我通过 Kafka REST 代理客户端发布消息时,出现以下错误,这可能表明 @KafkaListener 方法无法读取 ConsumerRecord 传入消息。任何意见都将受到高度赞赏。

我的 Kafka-Rest 属性当前配置如下:

confluent-3.3.0/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=localhost:2181

Error Log after war deployment on tomcat

2017-12-26 09:11:01.143 ERROR 20430 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 3, CreateTime = 1514279460946, checksum = 1183108784, serialized key size = -1, serialized value size = 72, key = null, value = InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'])

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(com.psl.kafka.spring.InventoryEvent,java.lang.String,java.lang.Integer,int,java.lang.String)]
Bean [com.psl.kafka.spring.InventoryEventReceiver@798267fb]; nested exception is org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer], failedMessage=GenericMessage [payload=InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'], headers={kafka_offset=3, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:183) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:568) [spring-kafka-1.1.7.RELEASE.jar:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer]
        at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        ... 8 common frames omitted

最佳答案

使用@KafkaListener,使用仅 POJO“InventoryEvent”作为参数的方法

InventoryEvent event

而不是

@Payload InventoryEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) String offset

这解决了这个问题,因为 kafka_receivedMessageKey 永远不会按照 Artem Bilan https://stackoverflow.com/a/32125453/786676 的这个 SO 答案中指定的方式通过 Kafka 发送。

关于java - org.springframework.messaging.MessageHandlingException : Missing header 'kafka_receivedMessageKey' for method parameter type [class java. lang.Integer],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47976411/

相关文章:

performance - Akka HTTP REST API 用于提高 Kafka 性能

Java Array 列和行编号

java - 在处理中重新绘制当前像素周围的选定像素区域

java - 在java中发送消息之前检查kafka客户端生产者是否正在运行

apache-kafka - Spring Kafka 分区

java - Kafka 模式订阅。新主题未触发再平衡

apache-kafka - Kafka Consumer poll() 方法被阻塞

java - 具有嵌入式 Jetty 服务的 JAX-RS - 主页 URL

java - List<String[]> float 读取 csv

apache-kafka - 每个分区都有多线程 Kafka 消费者,是否可能并推荐,如果有任何示例片段?