parquet - 从 Python 增量写入 Parquet 数据集

标签 parquet pyarrow

我正在从我的 Python 应用程序中写入大于 RAM 的数据 - 基本上将数据从 SQLAlchemy 转储到 Parque。我的解决方案的灵感来自 this question 。即使增加the batch size as hinted here我面临的问题是:

  • RAM 使用量大幅增长

  • 写入器在一段时间后开始变慢(写入吞吐量速度下降超过 5 倍)

我的假设是,这是因为当行数增加时,ParquetWriter 元数据管理变得昂贵。我想我应该切换到 datasets这将允许编写者在处理刷新元数据的过程中关闭文件。

我的问题是

  • 是否有使用 Python 和 Parquet 编写增量数据集的示例

  • 我的假设正确还是错误,使用数据集是否有助于维持写入器吞吐量?

我的蒸馏代码:


writer = pq.ParquetWriter(
                    fname,
                    Candle.to_pyarrow_schema(small_candles),
                    compression='snappy',
                    allow_truncated_timestamps=True,
                    version='2.0',  # Highest available schema
                    data_page_version='2.0',  # Highest available schema
            ) as writer:

    def writeout():
        nonlocal data
        duration = time.time() - stats["started"]
        throughout = stats["candles_processed"] / duration
        logger.info("Writing Parquet table for candle %s, throughput is %s", "{:,}".format(stats["candles_processed"]), throughout)
        writer.write_table(
            pa.Table.from_pydict(
                    data,
                    writer.schema
            )
        )
        data = dict.fromkeys(data.keys(), [])
        process = psutil.Process(os.getpid())
        logger.info("Flushed %s writer, the memory usage is %s", bucket, process.memory_info())

    # Use massive yield_per() or otherwise we are leaking memory
    for item in query.yield_per(100_000):
        frame = construct_frame(row_type, item)
        for key, value in frame.items():
            data[key].append(value)

        stats["candles_processed"] += 1

        # Do regular checkopoints to avoid out of memory
        # and to log the progress to the console
        # For fine tuning Parquet writer see
        # https://issues.apache.org/jira/browse/ARROW-10052
        if stats["candles_processed"] % 100_000 == 0:
            writeout()

最佳答案

在本例中,原因是错误地使用了 Python 列表和字典作为工作缓冲区,正如 @0x26res 所指出的。

确保正确清除列表字典后,内存消耗问题就可以忽略不计了。

关于parquet - 从 Python 增量写入 Parquet 数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68375254/

相关文章:

io - 使用 pyarrow.parquet 编写数据集时,是否可以覆盖文件名的 uuid 自动分配?

Python - 模块 'pyarrow' 没有属性 'Table' 错误

python - 在 Glue pythonshell 中使用 pyarrow - ModuleNotFoundError : No module named 'pyarrow.lib'

python - 在 `pyarrow` 测试中使用内存文件系统

python - 如何使用 pyarrow 将 Pandas 数据帧设置/获取到 Redis

python - 如何用时区解析时间戳?

python-3.x - pip install pyarrow 在 Linux/在 docker 中失败

java - 从 Azure Blob 容器读取 Parquet 数据,而无需在本地下载

python - 重新分区 Dask DataFrame 以获得均匀的分区

pandas - AWS Redshift Spectrum 十进制类型读取 Parquet 双类型