apache-kafka - 从 Kafka 检索基于时间戳的数据

标签 apache-kafka

如何从 Kafka 集群获取指定日期的消息或数据。例如 9 月 13 日,任何人都可以为此提供代码。我用谷歌搜索了一下,只找到了理论,但我想要代码

最佳答案

没有为此的访问方法。另外,在 Kafka v0.10 之前,消息不包含任何时间戳信息,因此无法知道消息何时写入主题。

从 Kafka v0.10 开始,每条消息都包含一个元数据时间戳属性,该属性由生产者在消息创建时间设置,或由代理在消息插入时间设置。计划建立基于时间的索引,但尚未可用。因此,您需要使用整个主题并检查时间戳字段(并忽略您不感兴趣的所有消息)。要找到开头,您还可以对偏移量和时间戳进行二分搜索,以更快地找到第一条消息。

更新:

Kakfa 0.10.1 添加了基于时间的索引。它允许seek到时间戳等于或大于给定时间戳的第一条记录。您可以通过 KafkaConsumer#offsetsForTime() 来使用它。这将返回相应的偏移量,您可以将它们输入到 KafkaConsumer#seek() 中。您可以仅使用数据并通过 ConsumerRecord#timestamp() 检查记录时间戳字段,以查看何时可以停止处理。

请注意,数据严格按偏移量排序,而不是按时间戳排序。因此,在处理过程中,您可能会得到时间戳小于开始时间戳的“迟到”记录(不过您可以简单地跳过这些记录)。

一个更困难的问题是在搜索间隔结束时迟到的记录。在您获得时间戳大于搜索间隔的第一个时间戳后,稍后可能仍存在带有时间戳的记录,这些记录是您搜索间隔的一部分(如果这些记录确实被附加到“迟到”主题)。但没有办法知道这一点。因此,您可能想继续阅读“更多”记录并检查是否有“迟到”记录。 “一些记录”意味着多少,是您需要自己做出的设计决定。

不过,没有通用指南 - 如果您对“写入模式”有更多了解,它可以帮助您定义一个好的策略,确定在搜索间隔“结束”后您想要使用多少条记录。当然有两种默认策略:(1)在时间戳大于搜索间隔的第一个记录处停止(并有效地忽略任何迟到的记录 - 如果您使用“日志附加时间”配置,这当然是一个安全的策略); (2) 你读到日志的末尾——就完整性而言,这是最安全的策略,但可能会导致过高的开销(另请注意,因为记录可以随时附加,并且如果记录“延迟”可能任意大,到达日志末尾后,甚至可能会追加一条迟到的记录)。

在实践中,考虑“最大预期延迟”并进行读取可能是个好主意,直​​到获得时间戳大于此延迟上限的记录为止。

关于apache-kafka - 从 Kafka 检索基于时间戳的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39514167/

相关文章:

elasticsearch - GCP/Dataproc 上 Kafka、Spark、Elasticsearch 堆栈的吞吐量

apache-kafka - 用于时间序列数据持久性的 Apache Kafka

java - Azure 事件中心与 Spring Cloud Stream Kafka 的连接问题

java - 连续运行 Java 程序的标准方法

java - 如何在同一个盒子上运行多个相互独立的kafka消费者?

java - 我想通过log4j记录STORM日志并将日志发送到kafka。结果是kafka STORM可以自己记录日志log4j但不能发送日志到kafka

spring-boot - Spring Cloud Stream 动态 channel

java - 使用 Apache Beam 将 Kafka Stream 输出写入多个目录

apache-kafka - 出于队列目的使用 Kafka 而不是 Redis

java - 运行 JavaKafkaWordCount.java 时出现错误