python - 卡夫卡与 python : How to send topic to postgreSQL?

标签 python python-3.x postgresql apache-kafka

我被敦促将 Kafka 与 python 一起使用。此外,我需要开发一个非常简单的生产者-消费者应用程序,该应用程序实时从设备读取指标,然后将它们发布到 Kafka 中的主题“指标”。然后消费者必须订阅“指标”主题并将这些数据存储到 postgreSQL 数据库。
我试图在这里绘制架构:

           +-----------+        Fetch metrics every 1 second          +--------------+                                           
           |Biometric  |     {heartrate, oxygen level, temprature}     |              |                                           
           |generation ------------------------------------------------  producer.py |                                           
           |device     |                                              |              |                                           
           +-----------+                                              +-------|------+                                           
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              |Publish metrics in "metrics" topic, every 1 second
                                                                              |{heartrate, oxygen level, tempature}              
                                                                              |         JSON format                              
                                                                              |                                                  
                                                                              |                                                  
                                                                      +-------|------+                                           
                                                                      |              |                                           
                                                                      |    KAFKA     |                                           
                                                                      |              |                                           
                                                                      +-------|------+                                           
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              | Subscribe to "metrics" topic and fetch           
                 -                                                            | the JSON every 1 second                          
                                                                              |                                                  
          +-------------+                                              +------|------+                                           
          |             |            Send data to postgreSQL           |             |                                           
          | postgreSQL  ------------------------------------------------ consumer.py |                                           
          |             |                                              |             |                                           
          +-------------+                                              +-------------+                                           
现在,这就是我(零 Kafka 经验)想象这个应用程序的方式。我设法把一切都交给了消费者。
我现在很容易连接到 postgreSQL 数据库并将这些数据发送给它。
但我很困惑。我到处都读到,与此类数据库的连接必须通过 Kafka 连接器 (?) 进行。只是将我在消费者中收到的数据手动发送到 postgres 是错误的吗?为什么我要在这里使用“Kafka 连接器”?最后,我不知道有任何 python kafka 连接器,这对我来说更加复杂。
有人可以帮我清理一下吗?

最佳答案

如果你想将数据以 JSON 格式推送到 kafka 我最近在 here 上写了一个简单的例子.
您还可以找到 kafka python docs
对于 Kafka -> PostgreSQL 连接,您可能需要使用 Kafka Connect JDBC下沉。 Kafka Connect 是一系列预先构建的连接器,允许您通过编写配置文件从 Kafka 推送或拉取(kafka 连接术语中的源或接收器)数据,而无需一遍又一遍地编码或重新发明轮子.
Kafka 连接不依赖于语言,因为您只需要在 Kafka 环境中部署它并正确设置配置文件。
请注意,如果您打算使用 Kafka connect 将数据推送到 PostgreSQL,您可能需要

  • 以 AVRO 格式创建源流
  • 将架构规范添加到您的 JSON 消息中(更多信息 here
  • 关于python - 卡夫卡与 python : How to send topic to postgreSQL?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66167890/

    相关文章:

    python - Airbnb Airflow 与 Apache Nifi

    python - Pandas 将 "\r"添加到 csv 文件

    sql - 选择表中不存在的 ID

    python - 如何在不创建任何新变量的情况下在字典理解中解压缩元组的值?

    python - 如何检查文本文件中存储的数据是否符合特定条件?

    sql - TimeScaleDB 物化行太大

    database - PostgreSQL - 事务确保原子性吗?

    python - 使用条件结果列连接 PySpark 数据框

    python - 将两个不同的列合并为一列(说明是 name )

    python - psycopg2 "IndexError: tuple index out of range"使用带有参数元组的 '%' 之类的运算符时出错