apache-kafka - Kafka以相反的顺序消费消息

标签 apache-kafka kafka-consumer-api

我使用 Kafka 0.10,我有一个话题 logs我的物联网设备将他们的日志发布到 ,我的消息的关键是 device-id ,所以同一个设备的所有日志都在同一个分区中。

我有一个 api /devices/{id}/tail-logs需要显示调用时一台设备的最后 N 个日志。

目前,我以一种非常低效的方式(但有效)实现了它,因为我从包含设备日志的分区的开头(即最旧的日志)开始,直到达到当前时间戳。

一种更有效的方法是,如果我可以获得当前的最新偏移量,然后向后使用消息(我需要过滤掉一些消息以仅保留我正在寻找的设备的消息)

是否可以用 kafka 做到这一点?如果不是如何解决这个问题? (我会看到一个更重的解决方案是将 kafka-connect 链接到 Elasticsearch ,然后查询 Elasticsearch ,但为此再添加 2 个组件似乎有点矫枉过正......)

最佳答案

当您使用 0.10.2 时,我建议您编写一个 Kafka Streams 应用程序。应用程序将是有状态的,状态将保存每 device-id 的最后 N 条记录/日志——如果新数据写入输入主题,Kafka Streams 应用程序将只更新它的状态(无需重新读取整个主题)。

此外,该应用程序还为您提供请求(“api /devices/{id}/tail-logs ”使用 Interactive Queries 功能。

因此,我不会构建一个必须重新计算每个请求的答案的无状态应用程序,而是构建一个有状态的应用程序,它会急切地为所有可能的请求(即,对于所有 device-id s)计算结果(并始终自动更新结果) ) 并在请求进来时仅返回已计算的结果。

关于apache-kafka - Kafka以相反的顺序消费消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43401737/

相关文章:

apache-kafka - org.apache.kafka.connect.errors.DataException : Converting byte[] to Kafka Connect data failed due to serialization error of topic

java - ConcurrentKafkaListenerContainerFactory 的 Spring-Kafka 消费者组协调

java - 使用 Kafka-Spark Streaming API 处理流数据时进行重复

elasticsearch - 如何将 Kafka 批量读取到 Elasticsearch

nginx - 允许通过nginx访问kafka

apache-kafka - 如何修复 : java. lang.OutOfMemoryError:flink kafka 消费者中的直接缓冲内存

apache-kafka - Kafka 消费者获取主题元数据失败

kafka-consumer-api - kafka消费者迭代器是如何工作的

hadoop - 如何将 Kafka 主题加载到 HDFS?

oracle11g - 用于 Oracle 数据库源的 Kafka 连接器