Python ETL - 使用 cx_Oracle 批量或迭代地将大型数据集加载到 Oracle 数据库中

标签 python oracle dataframe plsql cx-oracle

使用 Python 将 10MM 记录的数据集加载到 Oracle 数据库表中。 数据框创建没有问题。当从 cx_Oralce 加载数据帧记录计数太大错误时。

试图遍历数据帧并通过一次插入下摆 100k 记录来批量加载 10MM 记录。

下面显示的代码有效,但仅适用于适合分配内存的小型数据集。我需要一个适用于批量和大型数据集的工具

已尝试遍历行,但这需要很长时间。还尝试加载一个更小的数据框 - 这有效但没有达到目标。

还尝试使用 Bindarray 和数组大小来填充数据框,但没有任何效果。

import pandas as pd
import datetime
import sys
import re
from itertools import groupby, islice, takewhile
import cx_Oracle

format = '%y_%m_%d'

TODAY = datetime.date.today()
add = datetime.timedelta(days=1)
yesterday = datetime.date.today() - add
dd = datetime.date.strftime(TODAY,format)

# connection variables
connection = cx_Oracle.connect("user/Oracle_database_connect_info")
cur = connection.cursor()

# dataframe headers
columns = ['C1','C2','C3','C4']

# -- >> test and sample the file
csv_df = pd.read_csv(r'csvfile_location')

# add record_id for values
csv_df_idx = csv_df.index.values +1
csv_df.insert(0,column = 'RECORD_ID' , value=csv_df_idx)


### TABLE ALREADY CREATED IN DATABASE ###


for index, row in csv_df.iterrows():
    ### Insert and Iterate to inset records
    ### Convert to list for easy load into DB
    csv_df_dataset_lst = csv_df.values.tolist()
    insert_statement = """
    INSERT INTO TEST_LOAD
        ( RECORD_ID ,C1 ,C2 ,C3 ,C4)values (:1,:2,:3,:4,:5)    """

    # control number of records to bind for insert
    # cur.bindarraysize = 100000 # --->>> did not work
    # cur.arraysize = 100000 # --->>> did not work
    cur.executemany(insert_statement,csv_df_dataset_lst)
    connection.commit()
connection.close()

最佳答案

想通了。诀窍是编写一个函数,根据要加载的批处理的大小将数据帧分成多个段。

下面是最终代码。

import pandas as pd
import numpy as np
import datetime
import sys
import re
from itertools import groupby, islice, takewhile
import cx_Oracle

format = '%y_%m_%d'

TODAY = datetime.date.today()
add = datetime.timedelta(days=1)
yesterday = datetime.date.today() - add
dd = datetime.date.strftime(TODAY,format)

# connection variables
connection = cx_Oracle.connect("user/Oracle_database_connect_info")
cur = connection.cursor()

# dataframe headers
columns = ['C1','C2','C3','C4']

# -- >> test and sample the file
csv_df = pd.read_csv(r'csvfile_location')

# add record_id for values
csv_df_idx = csv_df.index.values +1
csv_df.insert(0,column = 'RECORD_ID' , value=csv_df_idx)


### TABLE ALREADY CREATED IN DATABASE ###


# set batch size ie record count
batch_size = 100000

# create chunker function to separate the dataframe into batches
# Note: last batch will contain smallest amout of records.
def chunker(seq,size):
    return(seq[pos:pos+size] for pos in range(0,len(seq),size))


insert_statement = """
    INSERT INTO TEST_LOAD
        ( RECORD_ID ,C1 ,C2 ,C3 ,C4)values (:1,:2,:3,:4,:5)    """

# Optional use cursor.prepare so Oracle DB avoids compiling the insert statement over and over
try:
    cur.prepare(insert_statement)
except cx_Oracle.DatabaseError as Exception:
    printf('Failed to prepare insert cursor')
    printException(Exception)
    exit(1)

for i in chunker(csv_df,batch_size):
    ### Insert and Iterate to inset records
    ### Convert to list for easy load into DB
    csv_df_dataset_lst = csv_df.values.tolist()

    cur.executemany(insert_statement,csv_df_dataset_lst)
    connection.commit()
    # record counter to monitor the loading.
    number_of_records_loaded = cur.execute("""SELECT COUNT(*), SYSDATE FROM TEST_LOAD GROUP BY SYSDATE""")
    record_out = cur.fetchall()
    for row in record_out:
        print(row)
connection.close()

关于Python ETL - 使用 cx_Oracle 批量或迭代地将大型数据集加载到 Oracle 数据库中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58296799/

相关文章:

java - 从数据库查找表填充所有下拉字段

apache-spark - 需要更少的 Parquet 文件

r - 计算 data.frame 中的行和和乘积

python - tastypie api 不在 json 结果中显示 ForeignKey

python - SGDClassifier 在 MNIST 上的使用

oracle - Symfony2 Doctrine :schema:drop --force doesn't work

asp.net - 使用 Oracle 10g 数据库进行 ASP.NET session 管理

apache-spark - Spark SQL : How to call UDF from DataFrame operation using JAVA

python - 如何使用 OptParse 检索具有非 ASCII 字符的字符串?

python - 删除多个文件夹中的相同文件(python)