python - pd.DataFrame.to_sql(method ="multi") GCP Postgres 引发 struct.error 'h' 格式需要 -32768 <= number <= 32767 和用户定义的 dtypes

标签 python pandas postgresql ubuntu sqlalchemy

在这里发布我的第一个问题 - 请放轻松!
我正在尝试将 Pandas 数据框(3,000,000 x 8)写入 GCP 托管的 Postgres 数据库。我正在使用类似于以下内容的内容来编写我的数据。

from sqlalchemy import Table,MetaData,Column,String,Integer,Float,DateTime,ARRAY,BigInteger
import pandas as pd
import sqlalchemy
from datetime import datetime
from google.cloud.sql.connector import connector
import numpy as np
import random

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path-to-your-keys"
Base = declarative_base()

os.environ['DB_USER'] = "root-user"
os.environ['DB_PROJECTID']  ="project-id-from-GCP"
os.environ["DB_NAME"] = "DB-NAME"
os.environ["DB_PASS"] = "your-password-for-the-GCP-DB"

def getconn():
    conn = connector.connect(
        os.environ["DB_PROJECTID"],
        "pg8000",
        user=os.environ["DB_USER"],
        password=os.environ["DB_PASS"],
        db=os.environ["DB_NAME"],
    )
    return conn

db = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )

def make_dummy_df():
    rng = np.random.default_rng()
    df = pd.DataFrame(rng.integers(0, 50000, size=(3000000, 1)), columns=['window'])
    df['start'] = list(pd.date_range(start=datetime(2020,1,1),end=datetime.today(),periods=int(df.shape[0])))
    df['end'] = list(pd.date_range(start=datetime(2020,1,1),end=datetime.today(),periods=int(df.shape[0])))
    df['degree'] = [random.randint(0,40) for _ in range(df.shape[0])]
    df['x'] = [random.sample(range(10000, 100000), 10) for _ in range(df.shape[0])]
    df['y'] = [random.sample(range(-100, 100), 10) for _ in range(df.shape[0])]
    df['z'] = [random.sample(range(100, 1000), 10) for _ in range(df.shape[0])]      
    df['index'] = df.index                  
    return df

if __name__=="__main__":
    df = make_dummy_df()
    df.to_sql(
        "test1",
        con=db,
        if_exists="replace",
        index=False,
        method="multi",
        chunksize=10000,
        dtype={
             "index":BigInteger(),
             "window":Integer(),
             "degree":Integer(),
             "start":DateTime(),
             "end":DateTime(),
             "x":ARRAY(Float),
             "y":ARRAY(Float),
             "z":ARRAY(Float)
         })

中运行时引发以下错误Linux 环境。 linux 机器是 AWS EC2 Ubuntu Server 20.04 LTS (HVM) 上的虚拟机,SSD 卷类型 c4.8xlarge
Linux ip-xxx-xx-xx-xx A.B.C-D-aws #21~20.04.1-Ubuntu SMP x86_64 x86_64 x86_64 GNU/Linu
Traceback (most recent call last):                                                                                                                       
  File "testing.py", line 53, in <module>                                                                               
    df.to_sql(                                                                                                                       
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pandas/core/generic.py", line 2963, in to_sql                     
    return sql.to_sql(                                                         
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 697, in to_sql                          
    return pandas_sql.to_sql(                                                  
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 1739, in to_sql                         
    total_inserted = sql_engine.insert_records(                                
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 1322, in insert_records           
    return table.insert(chunksize=chunksize, method=method)                    
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 950, in insert 
    num_inserted = exec_insert(conn, keys, chunk_iter)                                                                       
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 873, in _execute_insert_multi        
    result = conn.execute(stmt)                                                                                                                       
  File "/home/ubuntu/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1295, in execute        
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)                                                                               
  File "/home/ubuntu/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection       
    return connection._execute_clauseelement(                                                                               
  File "/home/ubuntu/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1487, in _execute_clauseelement       
    ret = self._execute_context(                                                                                                                       
  File "/home/ubuntu/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1851, in _execute_context       
    self._handle_dbapi_exception(                                                                                                                       
  File "/home/ubuntu/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2036, in _handle_dbapi_exception       
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/home/ubuntu/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/ubuntu/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1808, in _execute_context
    self.dialect.do_execute(
  File "/home/ubuntu/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pg8000/dbapi.py", line 455, in execute
    self._context = self._c.execute_unnamed(
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pg8000/core.py", line 627, in execute_unnamed
    self.send_PARSE(NULL_BYTE, statement, oids)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pg8000/core.py", line 601, in send_PARSE
    val.extend(h_pack(len(oids)))
struct.error: 'h' format requires -32768 <= number <= 32767
以下是模块依赖版本:
Numpy:                        1.22.3
Pandas:                       1.4.1
SqlAlchemy:                   1.4.32
cloud-sql-python-connector:   0.5.2
这个问题特别与 GCP + SqlAlchemy + df.to_sql(method="multi") 中的 Postgres 相关。如果解决了问题,字段的 dtypes 可以改变。但是 df 中的数组必须作为 ARRAY 写入数据库。
我目前已经测试了使用以下方法将 DataFrame 分 block 成更小的尺寸:
n = int(round(df.shape[0]/20,0))
chunks = [df[i:i+n] for i in range(0,df.shape[0],n)]
然后迭代 block 。我还尝试从 DataFrame 中删除单个列并写入 DB 以尝试确定是否是一列导致问题 - 不走运。我已经制作了所有整数字段-> BigInteger() - 不走运。
有趣的是,如果您不将可选的 kwarg“方法”作为“多”传递 - df.to_sql 可以正常工作。我认为问题可能出在“多”中-但我不确定。
谢谢

最佳答案

通过类似的设置,我用更小的 block 大小避免了这个错误。

关于python - pd.DataFrame.to_sql(method ="multi") GCP Postgres 引发 struct.error 'h' 格式需要 -32768 <= number <= 32767 和用户定义的 dtypes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71752718/

相关文章:

sql - 选择 Rails 中一个属性的最大值限定为另一个属性的记录

python - Django 模型 i18n 的内容

python - 指定自定义假期的 Pandas

python - 使用 python statsmodels 修复summary_col 中的标签外生变量

python-3.x - 如何构建基于时间的 EWMA

mysql - 如何在Mysql中同时更新不同的行

python - Pyparsing Forward() 语法递归

python - 如何抑制结果中显示的 Tensorflow 警告

python - 在 R 单元、rpy2、Jupyter Notebook 中使用 pandas 数据帧时出错

java - 从日期减去一小时 SQL