java - Kafka Stream 时间戳提取器在 KGroupedStream 处失败

标签 java apache-kafka streaming apache-kafka-streams

我正在构建一个 Spring Cloud 微服务,它使用来自 kafka 主题的数据。在消费者中,我将主题绑定(bind)到 KStream。由于kafka版本低于0.10,传入消息不包含时间戳。当我解析传入的值时,它工作正常。否则,当我按键对它们进行分组时,它不会使用“default.timestamp.extractor”(已设置为 org.apache.kafka.streams.processor.WallclockTimestampExtractor)。

我已经使用不同版本的kafka(高于或等于0.10)测试了该服务,并且运行良好。

这是我的配置:

Spring : 云: 溪流: 卡夫卡: 流: Binder : 经纪人:${KAFKA_BROKERS} applicationId:电子邮件消息流 配置: default.key.serde:org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde 提交间隔时间:1000 default.timestamp.extractor:org.apache.kafka.streams.processor.WallclockTimestampExtractor poll.ms: 60000 # 等待更多消息的阻塞时间 buffered.records.per.partition: 2000

我的代码的某些部分:

    stream
        .mapValues(this::mapMessage)
        .groupBy(this::buildGroup, Serialized.with(new JsonSerde<>(Group.class), new JsonSerde<>(EmailMessage.class)))
        .windowedBy(TimeWindows.of(WINDOW_TIME))
        .aggregate(ArrayList::new, this::aggregate, Materialized.with(new JsonSerde<>(Group.class), new MessageListSerialization()))
        .toStream()
        .process(() -> new MailMessagesProcessor(emailService));

它向我抛出此错误:org.apache.kafka.streams.errors.StreamsException:输入记录ConsumerRecord(topic = .....)使用不同的TimestampExtractor来处理此数据。

最佳答案

Kafka Streams 需要代理 0.10.0 或更高版本。它与旧版经纪商不兼容。

  • Kafka Streams 0.10.0 仅与 0.10.0(或更高版本)代理兼容。

  • Kafka Streams 0.10.1 及更高版本向后兼容 0.10.1(但不兼容较旧的代理)并兼容较新的代理。

  • 此外,从 Kafka Streams 1.0 开始,需要消息格式 0.10(或更高)。因此,即使您将代理升级到 0.10.0(或更高版本),如果您的消息格式也没有升级,它也将无法工作。

  • 要使用“exactly-once”功能,需要 0.11.0(或更高版本)的代理版本。

更多详情请参阅:https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility

关于java - Kafka Stream 时间戳提取器在 KGroupedStream 处失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56501567/

相关文章:

c++ - 通过互联网流式传输时出现 live555 问题

Java正则表达式以and开头和结尾

java - 无法在方法之后打印 TestNg 中的语句

java - 正则表达式删除任何 ipv6 地址的前导零

java - SLF4J 与 Jboss 日志记录

apache-kafka - 如何使用 MongoDB 在 Kafka Connect Sink 连接器中获取 kafka 消息的 header

java - 合适的 Apache Kafka 客户端 jar 是什么

java - android:如何将数据写入媒体播放器缓冲区?

apache-kafka - 卡夫卡消费者错误 : Marking coordinator dead

c# - 从视频服务器 c# 流式传输音频