google-cloud-dataflow - ParDo 中的侧输出 | Apache Beam Python SDK

标签 google-cloud-dataflow apache-beam

由于文档仅适用于 JAVA,我无法真正理解它的含义。

它指出 - “虽然 ParDo 总是产生一个主输出 PCollection(作为 apply 的返回值),你也可以让你的 ParDo 产生任意数量的额外输出 PCollection。如果你选择有多个输出,你的 ParDo 将返回所有输出 PCollections(包括主输出)捆绑在一起。例如,在 Java 中,输出 PCollections 捆绑在类型安全的 PCollectionTuple 中。”

我明白捆绑在一起意味着什么,但是如果我在我的 DoFn 中生成一个标签,它是否会在所有其他输出为空的情况下生成一个捆绑包,并在代码中遇到其他输出时生成其他输出?或者它等待所有 yield 准备好输入和输出它们一起打包?

文档中没有很清楚它。虽然我认为它不会等待,只是在遇到时产生,但我仍然需要了解发生了什么。

最佳答案

回答这个问题的最好方法是举个例子。这个例子是 available in Beam .

假设您要运行一个字数统计管道(例如,计算每个单词在文档中出现的次数)。为此,您需要将文件中的行拆分为单独的单词。考虑到您还想单独计算字长。您的拆分变换将如下所示:

with beam.Pipeline(options=pipeline_options) as p:

    lines = p | ReadFromText(known_args.input)  # Read in the file

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    split_lines_result = (lines
                          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
                              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
                              main='words'))

    short_words = split_lines_result['words']
    character_count = split_lines_result[
        SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]

在这种情况下,每个都是不同的 PCollection ,具有正确的元素。 DoFn将负责拆分其输出,并通过标记元素来实现。看:

class SplitLinesToWordsFn(beam.DoFn):
  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'

  def process(self, element):
    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
    # collection.
    yield pvalue.TaggedOutput(
        self.OUTPUT_TAG_CHARACTER_COUNT, len(element))

    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      # yield word to add it to the main collection.
      yield word

如您所见,对于主输出,您不需要标记元素,但对于其他输出,您需要标记。

关于google-cloud-dataflow - ParDo 中的侧输出 | Apache Beam Python SDK,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52338640/

相关文章:

java - Apache 光束 : Unable to find registrar for gs

google-cloud-dataflow - 光束.BigQueryIO : What is numFileShards for?

google-cloud-platform - 使用本地直接运行程序时,Apache Beam 返回 "Input values must not be mutated in any way."

google-cloud-dataflow - Apache Beam 是否支持其输出的自定义文件名?

java - 从数据流中的 PubsubMessage 获取属性

python - 在管道中使用 apache beam 参数

java - 使用 Samza Runner 执行 Beam Pipeline 时出现 org.apache.beam.sdk.util.UserCodeException

java - 数据流/ApacheBeam 将输入限制为前 X 数量?

java - Google 云 Bigquery UDF 限制

python - 有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?