python - 如何使用python在kafka消费者中创建聚合

标签 python apache-kafka kafka-python

嗨,我已经使用 python 和模块 from kafka import KafkaConsumer 编写了 kafka 消费者应用程序

现在我有如下的 json 字段被使用,

{
  "user": "bob",
  "src_ip": "45.6.7.2"
 }

现在,我如何每 5 或 10 分钟(可配置时间)消费一次消息,然后检查每个用户的 src IP 在给定时间内是否相同。如果不同,那么我必须发送将其保存在数据库中或通过 REST POST 发送到不同的地方。

如何使用 python 消费者应用程序实现此目的?

最佳答案

是的,你可以!要获取消息的时间戳,请尝试 msg.timestamp

为了确保每个用户的数据始终路由到同一分区(因此它始终由同一使用者处理),请在以下情况下使用 key=data["user"]生成消息。

最后,您需要注意,在消费者应用程序的生命周期中,分区分配可能会发生变化。因此,请考虑当消费者在 5 或 10 分钟窗口之一的中间崩溃或丢失其分配时该怎么办。失去上下文重要吗?如果没有,您可能可以为每个消费者使用一个简单的内存数据存储。如果丢失上下文确实很重要,您可能需要考虑使用定期手动偏移提交或使用中央数据存储的替代策略。

关于python - 如何使用python在kafka消费者中创建聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54454504/

相关文章:

apache-kafka - 在使用 kafka-python 时定期轮询 Kafka 消费者的最佳方法是什么?

python - 减去 Pandas 时间戳;绝对值

python - Kafka 10 - 具有身份验证和授权的 Python 客户端

python - 卡住函数参数

java - 线程 "main"org.apache.kafka.streams.errors.InvalidStateStoreException : 中出现异常

java - 为什么 Apache Kafka 消费者不使用 Log4j2 根记录器?

docker - 我们可以使用在生产中使用docker化的Apache Kafka吗?

apache-spark - 如何将数据从 Kafka 传递到 Spark Streaming?

python - Pandas 列的矢量化 "and"

java - 从 Java 调用继承的类方法