python - 如何转换 Pandas Data Frame Schema

标签 python pandas schema parquet pyarrow

我正在使用 pandas.read_csv 读取 CSV 文件,它会自动检测模式,就像

Column1: string
Column2: string
Column3: string
Column4: int64
Column5: double
Column6: double
__index_level_0__: int64

然后,我尝试用 pyarrow.parquet.write_table 将其编写为 Parquet 表。但是,我想为新的 Parquet 文件使用以下架构

Column1: string
Column2: string
Column3: string
Column4: string
Column5: string
Column6: string
__index_level_0__: int64

但我收到一条错误消息“表架构与用于创建文件的架构不匹配”。这是我用来将 CSV 文件转换为 Parquet 文件的代码 borrowed from here

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000

csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")

for i, chunk in enumerate(csv_stream):
    print("Chunk", i)
    if i == 0:
        # Guess the schema of the CSV file from the first chunk
        # parquet_schema = pa.Table.from_pandas(df=chunk).schema
        parquet_schema = pa.schema([
            ('c1', pa.string()),
            ('c2', pa.string()),
            ('c3', pa.string()),
            ('c4', pa.string()),
            ('c5', pa.string()),
            ('c6', pa.string())
        ])
        # Open a Parquet file for writing
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
    # Write CSV chunk to the parquet file
    table = pa.Table.from_pandas(chunk, schema=parquet_schema)
    parquet_writer.write_table(table)

parquet_writer.close()

最佳答案

df=df.astype(str) 将使用内置的 astype() 将 pandas 数据框中的所有数据转换为字符串,使用 object dtypes方法

您还可以更改单个列的类型,例如 df['Column4'] = df['Column4'].astype(str)

您需要做的就是在 parquet_writer.write_table(table) 之前更改数据框的类型或其列的子集。总之,您的代码将如下所示。

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000

def convert(df):
    df['Column4'] = df['Column4'].astype(str)
    return df

csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")

for i, chunk in enumerate(csv_stream):
    print("Chunk", i)
    if i == 0:            
        converted = convert(chunk)
        parquet_schema = pa.Table.from_pandas(df=converted).schema

        # Open a Parquet file for writing
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')

    # Write CSV chunk to the parquet file
    converted = convert(chunk)
    table = pa.Table.from_pandas(converted, parquet_schema)
    parquet_writer.write_table(table)

parquet_writer.close()

关于python - 如何转换 Pandas Data Frame Schema,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53233613/

相关文章:

python - 在没有 python 的情况下运行 Python 应用程序

python - 从 mysql 打印/解析 csv blob 文件

ruby-on-rails - 使用工具/插件/脚本可视化 Rails schema.rb

schema - 使用什么版本编号方案?

python - 如何从 VGG 层创建 Keras Model()

python - 当我只在行中获得聚类时,如何微调 K 均值聚类?

python - matplotlib savefig() 大小控制

python - 从 Python Panda 到交叉过滤器的列平均值计算不正确

Python Pandas 根据时间序列中缺失的顺序值添加行

database - craigslist/kijiji 架构