apache-kafka-streams - Kafka Stream 使用 JoinWindow 进行数据重放

标签 apache-kafka-streams kafka-join

我有 2 个数据流,我希望能够在 1 个月的窗口中加入它们。当我拥有实时数据时, 一切都变得有趣且 super 简单。 KStream 加入 .我做了这样的事情;

KStream<String, GenericRecord> stream1 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());

KStream<String, GenericRecord> stream2 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());

long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

    KStream<String, GenericRecord> joinStream = stream1.join(stream2,
            new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
                @Override
                public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
                    final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
                    ....
                    ....
                    ....

                    return jonnedRecord;
                }
            }, JoinWindows.of(joinWindowSizeMs));

当我想进行数据重放时出现问题。假设我想为过去 6 个月的数据重新进行这些连接,因为我正在同时运行所有数据的管道 kafkaStream 将连接所有可连接的数据并且它没有考虑时间差(这它应该只加入过去一个月的数据)。我假设 JoinWindow 时间是我们将数据插入 Kafka 主题的时间,对吗?
以及我如何更改和操作这个时间以便我可以正确运行我的数据重放,我的意思是重新插入过去 6 个月的数据,每条记录应该花费一个月的时间窗口并基于该记录加入。

此问题与 How to manage Kafka KStream to Kstream windowed join? 不重复,在那里我询问了如何根据时间窗口加入。这里我说的是数据重放。根据我在加入Kafka期间将数据插入主题的时间作为JoinWindow的时间,因此如果您想进行数据重放并重新插入6个月前的数据,kafka将其作为新数据今天插入并加入一些实际上今天不应该使用的其他数据。

最佳答案

Kafka 的 Streams API 使用 TimestampExtractor 返回的时间戳计算连接。默认情况下,这是记录的嵌入元数据时间戳。 (参见 http://docs.confluent.io/current/streams/concepts.html#time)

默认情况下,KafkaProducer将此时间戳设置为写入时的当前系统时间。 (作为替代方案,您可以在每个主题的基础上配置代理,以在代理存储记录时使用代理的系统时间覆盖生产者提供的记录时间戳——这提供了“摄取时间”语义。)

因此,这本身不是 Kafka Streams 问题。

有多种选择可以解决这个问题:

  • 如果您的数据已经在一个主题中,您可以简单地重置您的 Streams 应用程序以重新处理旧数据。为此,您可以使用应用程序重置工具 ( bin/kafka-streams-application-reset.sh )。您还需要指定 auto.offset.reset政策到 earliest在您的 Streams 应用中。查看文档 - 另外,建议阅读博客文章。
  • http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool
  • https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

  • 这是最好的方法,因为您不需要再次向主题写入数据。
  • 如果您的数据不在主题中并且您需要写入数据,您可以在应用程序级别显式设置记录时间戳,为每条记录提供时间戳:

  • KafkaProducer producer = new KafkaProducer(...);
    producer.send(new ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value));
    

    因此,如果您摄取旧数据,您可以显式设置时间戳,Kafka Streams 将接收它并相应地计算连接。

    关于apache-kafka-streams - Kafka Stream 使用 JoinWindow 进行数据重放,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41811539/

    相关文章:

    apache-kafka-streams - Kafka Streams 构建 StateStoreSupplier : API clarifications

    java - 无法在kafka流中加载类 "org.slf4j.impl.StaticLoggerBinder"?

    java - 卡夫卡流: Flushing intermediate Windowed results as commit interval and window time are not in sync

    apache-kafka - 如何从 KTable 中获取排序后的 KeyValueStore?

    java - kafka 流消息中的总字数

    java - KStream 到 KTable 左连接返回 Null