python - apache beam 数据流管道中步骤的 if 语句 (python)

标签 python google-cloud-dataflow apache-beam apache-beam-io

我想知道是否可以在梁管道中使用 if 语句来根据不同的场景执行不同的转换。例如:

1) 将输入参数之一设为 backfill/regular,然后根据该输入参数决定是否以开头

(p 
            | fileio.MatchFiles(known_args.input_bucket)
            | fileio.ReadMatches()
            | beam.Map(lambda file: file.metadata.path, json.loads(file.read_utf8())))

p | beam.io.ReadFromText(known_args.input_file_name)

2) 如果文件名包含某个国家/地区名称(即美国),则调用 TransformUSA(beam.DoFn),否则调用 TransformAllCountries(beam.DoFn)

抱歉,如果这不是一个很好的问题,我在其他地方没有看到过这个问题,我正在尝试使我的代码模块化,而不是拥有单独的管道

最佳答案

管道完全可以有一个 if 语句,但请记住,在管道构建时应该知道一些事情。因此,例如:

with beam.Pipeline(...) as p:
  if known_args.backfill == True:
    input_pcoll = (p
                   | fileio.MatchFiles(known_args.input_bucket)
                   | fileio.ReadMatches()
                   | beam.Map(lambda file: file.read_utf8().split('\n'))
  else:
    input_pcoll = (p
                   | beam.io.ReadFromText(known_args.input_file_name)

然后,对于您的 TransformUSA,您将执行以下操作:

if 'USA' in known_args.input_file_name:
  next_pcoll = input_pcoll | beam.ParDo(TransformUSA())
else:
  next_pcoll = input_pcoll | beam.ParDo(TransformAllCountries())

这有意义吗?

关于python - apache beam 数据流管道中步骤的 if 语句 (python),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58121250/

相关文章:

python - 如何为机器学习模型调整图像大小

python - 如何在 python 2.7 中使用 pypi 安装包?

java - gcp 数据流流程元素不会转到下一个 ParDO 函数

google-cloud-platform - Google Data Fusion 能否进行与 DataPrep 相同的数据清理?

java - Eclipse:期间发生内部错误: "Update Hierarchy"

machine-learning - 使用 flink runner 在梁上进行 Tensorflow 变换

python - redis.exceptions.ConnectionError : Error 97 connecting to localhost:6379. 协议(protocol)不支持的地址族

python - 如何在 Apache Beam(Python SDK)中对早期触发进行单元测试

design-patterns - 在 apache beam 中调用外部 API 的更好方法

python - 如何解决AttributeError : '_Environ' object has no attribute 'has_key'