java - 如何在消费消息时访问 Kafka header ?

标签 java spring spring-integration apache-kafka

下面是我的配置

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
            kafka-consumer-context-ref="consumerContext"
            auto-startup="true"
            channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>

inputFromKafka经过下面的转换

public Message<?> transform(final Message<?> message) {

System.out.println( "KAFKA Message Headers " + message.getHeaders());

final Map<String, Map<Integer, List<Object>>> origData =  (Map<String, Map<Integer, List<Object>>>) message.getPayload();

        // some code to figure-out the nonPartitionedData
        return MessageBuilder.withPayload(nonPartitionedData).build();
    }

不管怎样,上面的打印语句只打印两个一致的标题

KAFKA 消息头 {id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272}

在发送 Kafka 消息时传递了一些 header ,包括 KafkaHeaders.MESSAGE_KEY 但我也没有回复,想知道​​是否有办法完成此操作?

最佳答案

不幸的是,它不能那样工作......

Producer部分 ( KafkaProducerMessageHandler) 如下所示:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());

如您所见,我们不发送任何 messageHeaders到卡夫卡topic .只有payload就在那messageKey之下由 Kafka 协议(protocol)指定。

从另一边Consumer side ( KafkaHighLevelConsumerMessageSource ) 执行以下逻辑:

if (!payloadMap.containsKey(messageAndMetadata.partition())) {
    final List<Object> payload = new ArrayList<Object>();
    payload.add(messageAndMetadata.message());
    payloadMap.put(messageAndMetadata.partition(), payload);
}

如您所见,我们不关心 messageKey .

KafkaMessageDrivenChannelAdapter ( <int-kafka:message-driven-channel-adapter> ) 适合你!它在将消息发送到 channel 之前执行此操作:

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());

if (!this.autoCommitOffset) {
    rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}

关于java - 如何在消费消息时访问 Kafka header ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31461154/

相关文章:

javascript - Javafx WebEngine 读取安全 cookie

java - 可编辑 JTable 教程

java - 在 Spring REST 服务中优雅地处理无效文件上传

java - 使用Gradle导入spring-integration-kafka时出错

java - Swing:MouseListener 仅在 ImageIcon 上而不是 JLabel 上

java - 哈希码返回整数值

java - Spring中HTTP请求后是否需要清除MDC

java - 使用带有 Tomcat 的 Jersey/Spring 在 REST 中注入(inject)时出错

java - Spring Integration http 出站网关 POST 响应不包含转义的符号

java - 为什么我的 main 方法一直在运行?