java - Kafka Java SimpleConsumer奇怪的编码

标签 java apache-kafka kafka-consumer-api

我正在尝试使用 Kafka 9 中的 SimpleConsumer 来允许用户从一个时间偏移量重播事件 - 但我从 Kafka 收到的消息采用一种非常奇怪的编码:

7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p=
                            ������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"

使用 KafkaConsumer 可以很好地解析此消息。这是我使用 SimpleConsumer 检索消息的代码:

    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < readOffset) {
            log.debug("Found an old offset - skip");
            continue;
        }

        readOffset = messageAndOffset.nextOffset();

        int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
        byte[] data = messageAndOffset.message().payload().array();
        byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
        log.debug("Read " + new String(realData, "UTF-8"));
}

在我不断收到有关字节过高的 UTF-32 错误后,我添加了代码以跳过前 x 个字节,我认为这是因为 Kafka 将消息大小等信息添加到负载中。这是 Avro 工件吗?

最佳答案

我从来没有找到一个很好的答案 - 但我转而使用 SimpleConsumer 查询 Kafka 的偏移量 我需要的(每个分区......虽然实现很差),然后使用使用 seek(TopicPartition, offset)seekToBeginning(TopicPartition) 的原生 KafkaConsumer 检索消息。希望他们会在下一个版本中为 native 客户端添加从给定时间戳检索消息的能力。

关于java - Kafka Java SimpleConsumer奇怪的编码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37598660/

相关文章:

java - 如何从一种方法返回 ArrayList 和文件数组?

apache-kafka - kafka消费者接收开销?

java - 设置并发> 1后spring kafka thorws InstanceAlreadyExistsException异常

java - 动态 if-then 代码

最终好奇的Java未初始化变量

java - 将未指定范围的数字存储到数组中

nullpointerexception - NPE 同时反序列化 kafka 流中的 avro 消息

apache-kafka - Kafka 消费者不是从最新消息开始

scala - Kafka scala Consumer代码——打印消费记录

apache-spark - kafka max.poll.records 在 Spark 流中不起作用