python-3.x - 使用influxdb-python的Influxdb批量插入

标签 python-3.x redis influxdb influxdb-python redis-streams

我使用influxDB-Python插入了从Redis-Stream读取的大量数据。因为Redis-stream并设置maxlen = 600且数据以100ms的速度插入,所以我需要保留其所有数据。所以我读取并将其传输到 influxDB(我不知道哪个数据库更好),但是使用批处理仅插入⌈count/batch_size⌉数据,在每个batch_size的末尾,似乎都是覆盖。以下代码

import redis
from apscheduler.schedulers.blocking import BlockingScheduler
import time
import datetime

import os
import struct
from influxdb import InfluxDBClient

def parse(datas):
    ts,data = datas
    w_json = {
    "measurement": 'sensor1',
    "fields": {
        "Value":data[b'Value'].decode('utf-8')
        "Count":data[b'Count'].decode('utf-8')
        }
    }
    return w_json

def archived_data(rs,client):
    results= rs.xreadgroup('group1', 'test', {'test1': ">"}, count=600)
    if(len(results)!=0):
        print("len(results[0][1]) = ",len(results[0][1]))
        datas = list(map(parse,results[0][1]))
        client.write_points(datas,batch_size=300)
        print('insert success')
    else:
        print("No new data is generated")

if __name__=="__main__":
    try:
        rs = redis.Redis(host="localhost", port=6379, db=0)
        rs.xgroup_destroy("test1", "group1")
        rs.xgroup_create('test1','group1','0-0')
    except Exception as e:
        print("error = ",e)
    try:
        client = InfluxDBClient(host="localhost", port=8086,database='test')
    except Exception as e:
        print("error = ", e)
    try:
        sched = BlockingScheduler()
        sched.add_job(test1, 'interval', seconds=60,args=[rs,client])
        sched.start()
    except Exception as e:
        print(e)
InfluxDB的数据更改如下
> select count(*) from sensor1;
name: sensor1
time count_Count count_Value
---- ----------- -----------
0    6           6
> select count(*) from sensor1;
name: sensor1
time count_Count count_Value
---- ----------- -----------
0    8           8

> select Count from sensor1;
name: sensor1
time                Count
----                -----
1594099736722564482 00000310
1594099737463373188 00000610
1594099795941527728 00000910
1594099796752396784 00001193
1594099854366369551 00001493
1594099855120826270 00001777
1594099913596094653 00002077
1594099914196135122 00002361

为什么数据似乎被覆盖,如何解决一次插入所有数据的问题?
如果您能告诉我如何解决它,我将不胜感激。

最佳答案

您能否提供要存储在Influx DB中的数据结构的更多详细信息?
但是,希望以下信息对您有所帮助。
在Influxdb中,时间戳+标签是唯一的(即两个具有相同标签值和时间戳的数据点不能存在)。与SQL influxdb不会引发唯一约束冲突不同,它会使用传入数据覆盖现有数据。看来您的数据没有标签,因此如果influxdb中已经存在其时间戳的某些传入数据将覆盖现有数据

关于python-3.x - 使用influxdb-python的Influxdb批量插入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62768787/

相关文章:

graphite - 如何在 OpenTSDB 和 InfluxDB 或其他 TSDS 之间做出选择?

python - Discord 自机器人 : Direct messaging a user when joining a server/guild

从仅附加文件恢复 Redis 数据?

python - 如何使用 Python 在 Redis 缓存中设置由同一列名下的多行组成的值

spring-boot - 将 Spring Boot 指标暴露给 influxDB 以实现 grafana 可视化

influxdb - DB 文件夹占用大量空间,造成空间问题

python - 石头、剪刀、布 - 平局时如何开始新游戏

python-3.x - Pandas 格式列作为货币

python - Matlab pyversion 命令找不到 python3.4 的库

redis - lettuce redis brpop 命令