嗨,我已经使用 python 和模块 from kafka import KafkaConsumer
编写了 kafka 消费者应用程序
现在我有如下的 json 字段被使用,
{
"user": "bob",
"src_ip": "45.6.7.2"
}
现在,我如何每 5 或 10 分钟(可配置时间)消费一次消息,然后检查每个用户的 src IP 在给定时间内是否相同。如果不同,那么我必须发送将其保存在数据库中或通过 REST POST 发送到不同的地方。
如何使用 python 消费者应用程序实现此目的?
最佳答案
是的,你可以!要获取消息的时间戳,请尝试 msg.timestamp
。
为了确保每个用户的数据始终路由到同一分区(因此它始终由同一使用者处理),请在以下情况下使用 key=data["user"]
生成消息。
最后,您需要注意,在消费者应用程序的生命周期中,分区分配可能会发生变化。因此,请考虑当消费者在 5 或 10 分钟窗口之一的中间崩溃或丢失其分配时该怎么办。失去上下文重要吗?如果没有,您可能可以为每个消费者使用一个简单的内存数据存储。如果丢失上下文确实很重要,您可能需要考虑使用定期手动偏移提交或使用中央数据存储的替代策略。
关于python - 如何使用python在kafka消费者中创建聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54454504/