我想知道是否可以在梁管道中使用 if 语句来根据不同的场景执行不同的转换。例如:
1) 将输入参数之一设为 backfill/regular,然后根据该输入参数决定是否以开头
(p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: file.metadata.path, json.loads(file.read_utf8())))
或
p | beam.io.ReadFromText(known_args.input_file_name)
2) 如果文件名包含某个国家/地区名称(即美国),则调用 TransformUSA(beam.DoFn)
,否则调用 TransformAllCountries(beam.DoFn)
抱歉,如果这不是一个很好的问题,我在其他地方没有看到过这个问题,我正在尝试使我的代码模块化,而不是拥有单独的管道
最佳答案
管道完全可以有一个 if 语句,但请记住,在管道构建时应该知道一些事情。因此,例如:
with beam.Pipeline(...) as p:
if known_args.backfill == True:
input_pcoll = (p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: file.read_utf8().split('\n'))
else:
input_pcoll = (p
| beam.io.ReadFromText(known_args.input_file_name)
然后,对于您的 TransformUSA
,您将执行以下操作:
if 'USA' in known_args.input_file_name:
next_pcoll = input_pcoll | beam.ParDo(TransformUSA())
else:
next_pcoll = input_pcoll | beam.ParDo(TransformAllCountries())
这有意义吗?
关于python - apache beam 数据流管道中步骤的 if 语句 (python),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58121250/