python - 如何从 GCP 存储桶读取 Apache Beam 中的多个文件

标签 python python-3.x apache-beam dataflow apache-beam-io

我正在尝试使用 Apache Beam 读取 GCP 中的多个文件并对其应用一些子集。我准备了两个管道,它们仅适用于一个文件,但当我在多个文件上尝试它们时会失败。除此之外,如果可能的话,我会很方便地将我的管道合并为一个,或者是否有一种方法来编排它们,以便它们按顺序工作。现在管道在本地工作,但我的最终目标是使用 Dataflow 运行它们。

我使用textio.ReadFromText 和textio.ReadAllFromText,但在多个文件的情况下我无法使这两种方法都工作。

def toJson(file):
    with open(file) as f:
        return json.load(f)


 with beam.Pipeline(options=PipelineOptions()) as p:
       files = (p
        | beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
        | beam.io.WriteToText("/home/test",
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

 with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p  
            | 'read_data' >> beam.Create(['test-00000-of-00001.json'])
            | "toJson" >> beam.Map(toJson)
            | "takeItems" >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
            | beam.combiners.Count.PerElement()
            | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))

这两个管道适用于单个文件,但我有数百个相同格式的文件,并且希望利用并行计算的优势。

有没有办法让这个管道适用于同一目录下的多个文件?

是否可以在单个管道中执行此操作,而不是创建两个不同的管道? (从存储桶中向工作节点写入文件不太方便。)

最佳答案

我解决了如何使其适用于多个文件,但无法使其在单个管道中运行。我使用了for循环,然后使用了beam.Flatten选项。

这是我的解决方案:

file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]

with beam.Pipeline(options=PipelineOptions()) as p:
    for i,file in enumerate(file_list):
       (p 
        | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
        | "Write TExt {}".format(i) >> beam.io.WriteToText("/home/subject_test_{}".format(i),
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
   for i,res in enumerate(res_list):
         pcol = (p   | 'read_data_{}'.format(i) >> beam.Create([res])
            | "toJson_{}".format(i) >> beam.Map(toJson)
            | "takeItems_{}".format(i) >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects_{}".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
        pcols.append(pcol)
   out = (pcols
    | beam.Flatten()
    | beam.combiners.Count.PerElement()
    | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))

关于python - 如何从 GCP 存储桶读取 Apache Beam 中的多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58764501/

相关文章:

python - 使用字典 python 从不同路径导入 csv

python - 无法安装 TensorFlow Python 依赖项

python-3.x - Keras 'flow_from_directory' 非常慢

python - 使用python,如何计算图像中具有指定尺寸的对象的面积

google-cloud-dataflow - Google 数据流 GroupByKey 可以处理热键吗?

python - NumPy 将 8 位图像转换为 16/32 位图像

python - Flask-Admin ModelView 中的只读文本字段

python - 从另一个 Flask 应用程序调用 Flask 应用程序的 REST API

google-cloud-platform - 云数据流/光束 : Side Input Limit

python - 如何检索 PCollection 的内容并将其分配给普通变量?