Python Apache Beam 多重输出和处理

标签 python apache-beam

我正在尝试使用以下流程在 Google Dataflow 上运行作业: process_flow

本质上采用单个数据源,根据字典中的某些值进行过滤,并为每个过滤条件创建单独的输出。

我编写了以下代码:

# List of values to filter by
x_list = [1, 2, 3]

with beam.Pipeline(options=PipelineOptions().from_dictionary(pipeline_params)) as p:
    # Read in newline JSON data - each line is a dictionary
    log_data = (
        p 
        | "Create " + input_file >> beam.io.textio.ReadFromText(input_file)
        | "Load " + input_file >> beam.FlatMap(lambda x: json.loads(x))
    )
    
    # For each value in x_list, filter log_data for dictionaries containing the value & write out to separate file
    for i in x_list:
        # Return dictionary if given key = value in filter
        filtered_log = log_data | "Filter_"+i >> beam.Filter(lambda x: x['key'] == i)
        # Do additional processing
        processed_log = process_pcoll(filtered_log, event)
        # Write final file
        output = (
            processed_log
            | 'Dump_json_'+filename >> beam.Map(json.dumps)
            | "Save_"+filename >> beam.io.WriteToText(output_fp+filename,num_shards=0,shard_name_template="")
        )

目前它只处理列表中的第一个值。我知道我可能必须使用 ParDo,但我不太确定如何将其纳入我的流程中。

最佳答案

您可以在Beam中使用TaggedOutput。编写一个beam函数来标记pcollection中的每个元素。

import uuid
import apache_beam as beam
import dateutil.parser
from apache_beam.pvalue import TaggedOutput

class TagData(beam.DoFn):
    def process(self, element):
        key = element.get('key')   
        yield TaggedOutput(key, element)
        


processed_tagged_log = processed_log | "tagged-data-by-key " >> beam.ParDo(TagData()).with_outputs(*x_list)  

现在您可以将此输出写入单独的文件/表

# Write files to separate tables/files
    for key in x_list:
        processed_tagged_log[key] | "save file %s" % uuid.uuid4()>> beam.io.WriteToText(output_fp+key+filename,num_shards=0,shard_name_template="")
        

来源: https://beam.apache.org/documentation/sdks/pydoc/2.0.0/_modules/apache_beam/pvalue.html

关于Python Apache Beam 多重输出和处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52075600/

相关文章:

python - 聚合 numpy 函数

python - 查找数百万个范围/间隔之间的重叠

python - 在 Pandas 中对元素进行分组时添加默认值

java - 数据流管道和发布订阅模拟器

java - Google Dataflow Beam 作业中的 SSL 握手异常

python - 使用 re.match 匹配字符串不起作用

python - 如何在保持目录结构的同时提取所有 .tar.gz 文件?

tensorflow - 在服务时在 Keras 模型中包含 BEAM 预处理图

python-3.x - 无法构建 pyarrow(对于 python 3.7),错误消息为 ERROR : Could not build wheels for pyarrow which use PEP 517

google-cloud-dataflow - CoGroupByKey 如何与discardingFiredPanes 配合使用?