python-3.x - 在 Apache Beam (python) 中使用 ToList 输出作为 AsSingleton 或 AsList 的输入

标签 python-3.x apache-beam

当我尝试使用beam.combiners.ToList 的输出作为beam.pvalue.AsSingleton 或beam.pvalue.AsList 的输入以试验侧输入时出现意外错误。我能够使用单个数字(例如:列表的平均值)作为辅助输入,但是对于列表和字典,我得到了异常(exception)。对于beam.pvalue.AsSingleton,我得到:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-4-0c1df7400a03> in <module>
     15 chain_total = chain_1 | chain_2
     16 
---> 17 chain_1 | beam.Map(m, beam.pvalue.AsList(chain_2))
     18 
     19 chain_total | beam.Map(print)

~/.cache/pypoetry/virtualenvs/prototyping-with-tensorflow-py3.6/lib/python3.6/site-packages/apache_beam/pvalue.py in __init__(self, pcoll)
    297     self.pvalue = pcoll
    298     self._window_mapping_fn = sideinputs.default_window_mapping_fn(
--> 299         pcoll.windowing.windowfn)
    300 
    301   def _view_options(self):

AttributeError: '_ChainedPTransform' object has no attribute 'windowing'

对于beam.pvalue.AsList,我得到:
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-7-0c1df7400a03> in <module>
     15 chain_total = chain_1 | chain_2
     16 
---> 17 chain_1 | beam.Map(m, beam.pvalue.AsList(chain_2))
     18 
     19 chain_total | beam.Map(print)

~/.cache/pypoetry/virtualenvs/prototyping-with-tensorflow-py3.6/lib/python3.6/site-packages/apache_beam/pvalue.py in __init__(self, pcoll)
    297     self.pvalue = pcoll
    298     self._window_mapping_fn = sideinputs.default_window_mapping_fn(
--> 299         pcoll.windowing.windowfn)
    300 
    301   def _view_options(self):

AttributeError: '_ChainedPTransform' object has no attribute 'windowing'

这是我正在运行的代码

import apache_beam as beam


def m(x, u):
    print(u)
    return x


p = beam.Pipeline()

data_beam = Create(['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'])

chain_1 = p | data_beam | beam.combiners.Count.PerElement()
chain_2 = beam.Map(lambda x: x[0]) | beam.combiners.ToList()

chain_total = chain_1 | chain_2

chain_1 | beam.Map(m, beam.pvalue.AsSingleton(chain_2))

chain_total | beam.Map(print)

p.run()

用 beam.pvalue.AsList 替换 beam.pvalue.AsSingleton 得到另一个错误。我正在使用 Apache Beam python SDK 2.11.0 版。

最佳答案

PCollections 是 Beam 中的名词,PTransforms 是动词。
当您开始管道时,p = beam.Pipeline()是你唯一的名词。 (注:即使 Create 是动词。)

通过在这个名词上应用各种动词,您可以根据以下规则创建其他名词:

  • new_noun = existing_noun | verb

  • 混淆的主要来源似乎是因为您还可以将动词链接在一起:
  • fancy_verb = verb1 | verb2

  • 虽然这些示例中的语法看起来非常相似,但返回的值具有不同的类型。

    这里的主要问题是只有名词可以被视为 SideInputs。

    在您提供的示例中,chain_2是由两个动词组合而成的动词,错误信息确认_ChainedPTransform (实际上是 kind of PTransform ,顾名思义)不能传递给任何 AsSideInput 函数。

    关于python-3.x - 在 Apache Beam (python) 中使用 ToList 输出作为 AsSingleton 或 AsList 的输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55676378/

    相关文章:

    python - 节流光束应用中的一个步骤

    python - 如何使用 Python 通过暴力破解提取 .zip 文件

    python - 检查是否可以通过给定长度的跳跃来达到一个数字?

    python - 我如何着手解析具有模式的大型 JSON 文件的内容,而不是必须手动访问每个条目?

    python - 在列表中如何计算序列中的字符串

    python-3.x - 无法在 Windows 上的 Git Bash(Windows 应用商店)中安装 pylint

    google-cloud-platform - 有没有办法在 Cloud BigTable 中转储 bool 对象?

    google-cloud-platform - 合并两个 PCollection (Apache beam)

    java-8 - Apache Beam-无法在具有多个输出标签的DoFn上推断编码器

    python - 使用循环分支 Apache Beam 管道