python - 在 Google Cloud Dataflow/Apache Beam 中创建 GCS 对象的 PCollection

标签 python google-cloud-dataflow apache-beam

我正在尝试通过 Cloud Dataflow 学习自己的方法。为了学习的目的,我把他们的基础都分解了Word Count example到一个简单的剥离功能。我想创建一个 GCS 对象文件名的 PCollection。我收到消息说函数 ReadFromText() 不可迭代。

我对 PCollections 的理解是,它是要处理的对象的列表。我可以编写一个循环,将每个对象一一抛出以进行处理,但这不是我想要做的。我想让这部分保持动态,让 Apache Beam 处理其余部分。我只想提供 GCS 中的文件列表。

到目前为止,我已经成功地处理了单元素 PCollections。我也不想做类似 'gs://dataflow-samples/shakespeare/*' 的事情。

我还查看了 gcsIO moduleReadAllFromText() 。他们还说该函数不可迭代。请指导。

这是我到目前为止所做的事情:

"""A word-counting workflow."""

from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp import gcsio


class WordExtractingDoFn(beam.DoFn):
  """Parse each line of input text into words."""

  def __init__(self):
    super(WordExtractingDoFn, self).__init__()
  def process(self, element):  
    text_line = element.strip()
    return text_line


def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""

  p = beam.Pipeline(options=PipelineOptions())

  # Read the text file[pattern] into a PCollection.
  elements =                ['gs://dataflow-samples/shakespeare/1kinghenryiv.txt',
                            'gs://dataflow-samples/shakespeare/1kinghenryvi.txt',
                            'gs://dataflow-samples/shakespeare/2kinghenryiv.txt',
                            'gs://dataflow-samples/shakespeare/2kinghenryvi.txt',
                            'gs://dataflow-samples/shakespeare/3kinghenryvi.txt',
                            'gs://dataflow-samples/shakespeare/allswellthatendswell.txt',
                            'gs://dataflow-samples/shakespeare/antonyandcleopatra.txt',
                            'gs://dataflow-samples/shakespeare/asyoulikeit.txt',
                            'gs://dataflow-samples/shakespeare/comedyoferrors.txt',
                            'gs://dataflow-samples/shakespeare/coriolanus.txt',
                            'gs://dataflow-samples/shakespeare/cymbeline.txt',
                            'gs://dataflow-samples/shakespeare/hamlet.txt',
                            'gs://dataflow-samples/shakespeare/juliuscaesar.txt',
                            'gs://dataflow-samples/shakespeare/kinghenryv.txt',
                            'gs://dataflow-samples/shakespeare/kinghenryviii.txt',
                            'gs://dataflow-samples/shakespeare/kingjohn.txt',
                            'gs://dataflow-samples/shakespeare/kinglear.txt',
                            'gs://dataflow-samples/shakespeare/kingrichardii.txt',
                            'gs://dataflow-samples/shakespeare/kingrichardiii.txt',
                            'gs://dataflow-samples/shakespeare/loverscomplaint.txt',
                            'gs://dataflow-samples/shakespeare/loveslabourslost.txt',
                            'gs://dataflow-samples/shakespeare/macbeth.txt',
                            'gs://dataflow-samples/shakespeare/measureforemeasure.txt',
                            'gs://dataflow-samples/shakespeare/merchantofvenice.txt',
                            'gs://dataflow-samples/shakespeare/merrywivesofwindsor.txt',
                            'gs://dataflow-samples/shakespeare/midsummersnightsdream.txt',
                            'gs://dataflow-samples/shakespeare/muchadoaboutnothing.txt',
                            'gs://dataflow-samples/shakespeare/othello.txt',
                            'gs://dataflow-samples/shakespeare/periclesprinceoftyre.txt',
                            'gs://dataflow-samples/shakespeare/rapeoflucrece.txt',
                            'gs://dataflow-samples/shakespeare/romeoandjuliet.txt',
                            'gs://dataflow-samples/shakespeare/sonnets.txt',
                            'gs://dataflow-samples/shakespeare/tamingoftheshrew.txt',
                            'gs://dataflow-samples/shakespeare/tempest.txt',
                            'gs://dataflow-samples/shakespeare/timonofathens.txt',
                            'gs://dataflow-samples/shakespeare/titusandronicus.txt',
                            'gs://dataflow-samples/shakespeare/troilusandcressida.txt',
                            'gs://dataflow-samples/shakespeare/twelfthnight.txt',
                            'gs://dataflow-samples/shakespeare/twogentlemenofverona.txt',
                            'gs://dataflow-samples/shakespeare/various.txt',
                            'gs://dataflow-samples/shakespeare/venusandadonis.txt',
                            'gs://dataflow-samples/shakespeare/winterstale.txt']

  books = p | beam.Create((elements))
  #print (books)

  lines = p | 'read' >> ReadFromText(books)

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(unicode)))

  output = counts | 'write' >> WriteToText('gs://ihopeitworks/Users/see.txt',shard_name_template='')

  result = p.run()
  result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

最佳答案

你们已经很接近了。尝试下面的方法,即不要将书籍作为 ReadFromText 的参数传递,而是使用 ReadAllFromText 通过管道从书籍 PCollection 中读取。希望有帮助。

books = p | beam.Create((elements))
lines = books | 'read' >> ReadAllFromText()

关于python - 在 Google Cloud Dataflow/Apache Beam 中创建 GCS 对象的 PCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53801600/

相关文章:

python - 是否可以在一次调用中将 "unpack"写入一个字典?

google-cloud-dataflow - 诊断失败的 Cloud Dataflow 流水线

google-cloud-platform - Apache 梁 : ReadFromText versus ReadAllFromText

python - setup.py 是否确实将包加载到 GCP Dataflow 中?

python - 更新/追加 Pandas 数据框

python - 在 Python 中代理一个类

python - 在 Ipython Notebook 中绘制

go - Apache Beam 从 Go 中的 PCollection 中选择前 N 行

google-cloud-dataflow - 如何集成测试写入 Bigtable 的 Dataflow 管道?

google-cloud-platform - 我们可以将 webhook 与 Google PubSub 连接吗?