python - 在 Apache Beam 中将 csv 转换为 dict,无需知道 header 字段名称

标签 python csv google-cloud-dataflow apache-beam python-3.8

给定输入 CSV(存储在本地或 Google Cloud Storage 中)

a,b,c
1,2,3
4,5,6

如何获得有值(value)的 PCollection

{a: 1, b: 2, c: 3}
{a: 4, b: 5, c: 6}

事先不知道 CSV header 的名称?

最佳答案

这里有两个选项。

(1) 您可以使用 beam.io.ReadFromText 跳过标题,然后使用 beam.Map(lambda line: zip(header_names, line.split(',')) 。这不会处理引用等(尽管可以调整这样做,可能使用 csv 模块,但处理多行行不能使用此方法)。

(2) 你可以使用光束dataframes API为此,例如

from apache_beam.dataframe.io import read_csv

with beam.Pipeline as p:
    df = p | beam.dataframe.io.read_csv("/path/to/filepattern")
    # Here you can use df as if it were a Pandas dataframe,
    # or you can convert it into a PCollection of dicts with
    # pcoll = beam.dataframe.convert.to_pcollection(df)

关于python - 在 Apache Beam 中将 csv 转换为 dict,无需知道 header 字段名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66689622/

相关文章:

java - 在 JSON 转换为 CSV 期间保持 JSON 键的顺序

javascript - angularJS,将复选框列表展平为 CSV

python - 在一项 Dataflow 作业中写入和读取 BigQuery

apache-kafka - 卡夫卡 : exactly once semantics configuration using Apache Beam

python - random.sample 仅返回字符而不是字符串

python - Pymongo 不像 JSON 那样解析

python - Sympy:类型错误:无法将表达式转换为 float

php - 如何通过使用 WSGI 执行 Python 脚本来避免 PHP exec()?

php - CSV 到 MYSQL 解析问题

google-cloud-dataflow - 使用 BlockingDataflowPipelineRunner 和 Dataflow 模板的后处理代码