python - 我们如何使用 python sdk 在 Apache Beam 中读取带有外壳的 CSV 文件?

标签 python apache-beam apache-beam-io

我正在阅读一个逗号分隔的 CSV 文件,其中的字段用双引号括起来,其中一些字段的值中也有逗号,例如:"abc","def,ghi","jkl"有什么方法可以使用 Apache Beam 将此文件读入 PCollection 吗?

最佳答案

数据用双引号括起来的示例 csv 文件。

"AAA", "BBB", "Test, Test", "CCC" 
"111", "222, 333", "XXX", "YYY, ZZZ"

您可以使用 csv module来自标准库:
def print_row(element):
  print element

def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL, skipinitialspace=True):
    return line

parsed_csv = (
                p 
                | 'Read input file' >> beam.io.ReadFromText(input_filename)
                | 'Parse file' >> beam.Map(parse_file)
                | 'Print output' >> beam.Map(print_row)
             )

这给出了以下输出
['AAA', 'BBB', 'Test, Test', 'CCC']
['111', '222, 333', 'XXX', 'YYY, ZZZ ']

需要注意的一件事是 csv.reader对象期望 iterator这将返回 iterator的字符串。这意味着您不能将字符串直接传递给 reader() ,但您可以将其包含在 list 中如上。然后您将迭代输出以获得最终字符串。

关于python - 我们如何使用 python sdk 在 Apache Beam 中读取带有外壳的 CSV 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57890169/

相关文章:

python - 模型编辑的 Django 内联链接

java.lang.IllegalStateException : Unable to return a default Coder in dataflow 2. X

google-cloud-platform - 为什么 Dataflow 步骤未启动?

java - 如何在 Apache Beam 项目中直接使用 google-cloud-storage

Java Apache Beam PCollections 以及如何使它们工作?

python - Apache Beam Python SDK ReadFromKafka 不接收数据

java - apache-beam java 的 ElasticsearchIO 是否支持 Templating 和 ValueProvider 参数?调用模板时出错

python - 为什么 Pylint 认为与零比较是不好的?

python - 自动将 Jupyter notebook 的副本维护为纯 Python 代码

python - 如何将元组列表转换为数组