我使用 Kafka 0.10,我有一个话题 logs
我的物联网设备将他们的日志发布到 ,我的消息的关键是 device-id
,所以同一个设备的所有日志都在同一个分区中。
我有一个 api /devices/{id}/tail-logs
需要显示调用时一台设备的最后 N 个日志。
目前,我以一种非常低效的方式(但有效)实现了它,因为我从包含设备日志的分区的开头(即最旧的日志)开始,直到达到当前时间戳。
一种更有效的方法是,如果我可以获得当前的最新偏移量,然后向后使用消息(我需要过滤掉一些消息以仅保留我正在寻找的设备的消息)
是否可以用 kafka 做到这一点?如果不是如何解决这个问题? (我会看到一个更重的解决方案是将 kafka-connect 链接到 Elasticsearch ,然后查询 Elasticsearch ,但为此再添加 2 个组件似乎有点矫枉过正......)
最佳答案
当您使用 0.10.2 时,我建议您编写一个 Kafka Streams 应用程序。应用程序将是有状态的,状态将保存每 device-id
的最后 N 条记录/日志——如果新数据写入输入主题,Kafka Streams 应用程序将只更新它的状态(无需重新读取整个主题)。
此外,该应用程序还为您提供请求(“api /devices/{id}/tail-logs
”使用 Interactive Queries 功能。
因此,我不会构建一个必须重新计算每个请求的答案的无状态应用程序,而是构建一个有状态的应用程序,它会急切地为所有可能的请求(即,对于所有 device-id
s)计算结果(并始终自动更新结果) ) 并在请求进来时仅返回已计算的结果。
关于apache-kafka - Kafka以相反的顺序消费消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43401737/