python-3.x - 使用替换的字段值查询后将系列转储回 InfluxDB

标签 python-3.x influxdb influxdb-python

场景

我想通过查询 InfluxDB 的测量结果将数据发送到 MQTT 代理(云)。

我在架构中有一个名为status字段。它可以是 10status=0 表示该系列尚未发送到云端。如果我收到 MQTT 代理的确认,那么我希望使用 status=1 将查询重写回数据库。

FAQs for InfluxDB regarding Duplicate data 中所述如果信息与之前的查询具有相同的时间戳,但具有不同的字段值=>,则将显示更新字段。

为了测试这一点,我创建了以下内容:

CREATE DATABASE dummy
USE dummy
INSERT meas_1, type=t1, status=0,value=123 1536157064275338300

查询:

SELECT * FROM meas_1

提供

time                status type value         
1536157064275338300 0      t1   234      

现在,如果我想覆盖该系列,我会执行以下操作:

INSERT meas_1, type=t1, status=1,value=123 1536157064275338300                                                                       

这将覆盖该系列

 time                status type value         
 1536157064275338300 1      t1   234     

(注意:通过当前 InfluxDB 中的标签不可能实现这一点)

用法

  1. 使用客户端通过"status"=0查询一些信息。
  2. 重构 JSON 以发送到云端
  3. 将信息发送到云端
  4. 如果成功,则将第 1 步的输出写回数据库,但带有 status=1

我正在使用InfluxDBClient Python3创建应用程序(MQTT + InfluxDB)

write_points API内有一个参数提到 batch_size 需要 int 作为输入。

我不确定如何将其与我想要的应用程序一起使用。有人可以指导我这个或数据库的架构,以便我可以将实际和非冗余信息上传到云吗?

最佳答案

batch_size 实际上是需要传递给 write_points 的测量列表的长度。

步骤

  1. 创建客户端并根据测量结果进行查询(这里,我们查询 GPS 信息)

    client = InfluxDBClient(database='dummy')
    
    op = client.query('SELECT * FROM gps WHERE "status"=0', epoch='ns')
    
  2. ResultSet 放入列表中:

     batch = list(op.get_points('gps'))
    
  3. 创建一个空列表以进行更新

     updated_batch = []
    
  4. 解析每个测量结果并将status标志更改为1。注意,InfluxDB 中的默认值是 float

       for each in batch:
    new_mes = {
    'measurement': 'gps',
    'tags': {
    'type': 'gps'
    },
    'time': each['time'],
    'fields': {
      'lat': float(each['lat']),
      'lon': float(each['lon']),
      'alt': float(each['alt']),
      'status': float(1)
    }
    }
    updated_batch.append(new_mes)
    
  5. 最后通过客户端转储点,并将 batch_size 作为 updated_batch 的长度

    client.write_points(updated_batch, batch_size=len(updated_batch))
    

这会覆盖该系列,因为它包含相同的时间戳,且 status 字段设置为 1

关于python-3.x - 使用替换的字段值查询后将系列转储回 InfluxDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52188429/

相关文章:

python - 在 Python 中读取阿拉伯文文件

python - 如何计算列表列表并返回 float 列表

python - 将具有多个标签的值写入 influxDB(使用 python)

python - 使用带有 Line 协议(protocol)的 Python 客户端将数据插入 Influxdb

python - 在 InfluxDB-Python 中使用变更数据捕获?

python - 值错误 : Unknown loss function:focal_loss_fixed when loading model with my custom loss function

python-3.x - 我的压力传感器不会输出完整范围的值,使用 Raspberry Pi 3 和 Python3

influxdb - 流入QL : Influx DB conversion of a field from String to integer

Influxdb 在 Influxdb 内的数据库之间移动复制数据

python-3.x - 将带有标签的 pandas DF 写入 influxdb