python - 使用 Python 将大型压缩 JSON 文件从 Amazon S3 导入 AWS RDS-PostgreSQL

标签 python postgresql amazon-s3 etl amazon-rds

我正在尝试使用 Python 将大型 ZIPPED JSON 文件从 Amazon S3 导入 AWS RDS-PostgreSQL。但是,这些错误发生了,

Traceback (most recent call last):

File "my_code.py", line 64, in file_content = f.read().decode('utf-8').splitlines(True)

File "/usr/lib64/python3.6/zipfile.py", line 835, in read buf += self._read1(self.MAX_N)

File "/usr/lib64/python3.6/zipfile.py", line 925, in _read1 data = self._decompressor.decompress(data, n)

MemoryError

//我的代码.py

import sys
import boto3
import psycopg2
import zipfile
import io
import json
import config

s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()

bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)


def insert_query():
    query = """
        INSERT INTO data_table
        SELECT
            (src.test->>'url')::varchar, (src.test->>'id')::bigint,
            (src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
        FROM (SELECT CAST(%s AS JSONB) AS test) src
    """
    cursor.execute(query, (json.dumps(data),))


if key.endswith('.zip'):
    zip_files = obj['Body'].read()
    with io.BytesIO(zip_files) as zf:
        zf.seek(0)
        with zipfile.ZipFile(zf, mode='r') as z:
            for filename in z.namelist():
                with z.open(filename) as f:
                    file_content = f.read().decode('utf-8').splitlines(True)
                    for row in file_content:
                        data = json.loads(row)
                        insert_query()
if key.endswith('.json'):
    file_content = obj['Body'].read().decode('utf-8').splitlines(True)
    for row in file_content:
        data = json.loads(row)
        insert_query()

connection.commit()
connection.close()

这些问题有什么解决办法吗?任何帮助都可以,非常感谢!

最佳答案

问题是您尝试一次将整个文件读入内存,如果文件确实太大,这可能会导致内存不足。

您应该一次读取文件一行,因为文件中的每一行显然都是一个 JSON 字符串,您可以直接在循环中处理每一行:

with z.open(filename) as f:
    for line in f:
        insert_query(json.loads(line.decode('utf-8')))

顺便说一下,您的insert_query 函数应该接受data 作为参数:

def insert_query(data):

关于python - 使用 Python 将大型压缩 JSON 文件从 Amazon S3 导入 AWS RDS-PostgreSQL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51549272/

相关文章:

python - 在 python 中绘制轨道轨迹

sql - 在sql一对多关系中设置最年轻的时间戳

ruby-on-rails - 亚马逊S3 : how to combine all images into a video?

amazon-web-services - 获取Cloudformation模板的bucket url

python - 当敌人出现时 Pygame 崩溃

Python pandas 时间序列,具有分层索引和滚动/移位

python - 如何在 Python 中按日期时间对对象数组进行排序?

sql - PostgreSQL - 如何使用窗口函数从列中提取最大值

python - Django 唯一一起约束失败?

amazon-web-services - 如何在 ec2 中手动执行系统维护计划事件?