java - 卡夫卡流错误: SerializationException: Size of data received by LongDeserializer is not 8

标签 java apache-kafka apache-kafka-streams

我正在尝试 Kafka Streams。编写一个简单的应用程序,在其中计算重复消息。

消息:

2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2491
2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2492

等等

我正在尝试按 session:prod-xxxx 拆分此类消息。使用它作为 key 。并且 session:prod-xxxx+Hello World: xxxx 使用它作为值。然后按键分组,并查看每个 session 中哪些消息被重复。

代码如下:

KStream<String, String> textLines = builder.stream("RegularProducer");
KTable<String, Long> ktable = textLines.map(
    (String key, String value) -> {
        try {
            String[] parts = value.split("::");
            String sessionId = parts[1];
            String message = ((parts[2]).split("=>"))[1];
            message = sessionId+":"+message;
            return new KeyValue<String,String>(sessionId.trim().toLowerCase(), message.trim().toLowerCase());
        } catch (Exception e) {
            return new KeyValue<String,String>("Invalid-Message".trim().toLowerCase(), "Invalid Message".trim().toLowerCase());
        }
    })
    .groupBy((key,value) -> value)
    .count().filter(
            (String key, Long value) -> {
                return value > 1;
            }
    );

ktable.toStream().to("RegularProducerDuplicates", 
Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
topology.describe();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

KTable 主题 RegularProducerDuplicates 已生成。但是当我使用console-consumer查看它时,它因错误而崩溃。然后我在控制台消费者上使用 --skip-message-on-error 标志。现在我看到了数千行这样的行

session:prod-111656 : hello world: 994  [2019-02-28 16:25:18,081] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

谁能帮我看看这里出了什么问题吗?

最佳答案

您的 Kafka Streams 应用程序正常并且工作正常。

错误位于 kafka-console-consumer(kafka.tools.ConsoleConsumer 是实现脚本逻辑的类)。

反序列化期间无法正确处理 null。当它获取 null 作为消息的值或键时,它会设置默认值(表示 null 字符串的字节数组)。如果你检查源代码,你可以找到以下函数

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
  val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
  val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
    getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
  output.write(convertedBytes)
}

当它获取为 null ( sourceBytes==null ) 的 sourceBytes 进行反序列化时,您如何看到它设置了默认值:

val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))

在您的情况下,它是 "null".getBytes(StandardCharsets.UTF_8) 。然后,尝试使用 org.apache.kafka.common.serialization.LongDeserializer(您的值反序列化器)进行反序列化。 LongDeserializer 从一开始就检查字节数组的大小。现在它是 4(null 的字节表示)并且抛出异常。

例如,如果您使用 StringDeserializer,它不会正确反序列化它,但至少不会抛出异常,因为它不会检查字节数组的长度。

长话短说:ConsoleConsumer的格式化程序,负责打印,为了 pretty-print 设置一些默认值,一些反序列化器(LongDeserializer、IntegerDeserializer)无法处理该默认值

关于为什么您的应用程序为某些键生成 null 值:

KTable:filterKStream::filter 具有不同的语义。根据 KTable 的 javadoc:

for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.

对于您的 filter ,当 count <= 1 时,它会传递键的 null 值。

关于java - 卡夫卡流错误: SerializationException: Size of data received by LongDeserializer is not 8,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54924068/

相关文章:

java - jconsole/visualvm如何知道本地运行的java应用程序

java - 对几种方法计算运行时间分析

apache-kafka - Kafka设置默认保留日志并测试

ubuntu - 如何在Ubuntu系统启动时自动启动Kafka?

apache-kafka - Kafka Stream Suppress 在窗口化后不产生输出

apache-kafka - RocksDb sst 文件的 GUI 查看器

java - 人们是否提供多种机制来在 API 中做同样的事情?

java - 有没有人对游戏编程中的作业有任何想法?

apache-kafka - 使用kafka为clickhouse生产数据

go - 使用 sarama 编写 Kafka 制作人时无效的时间戳