python - Beam/Dataflow 自定义 Python 作业 - Cloud Storage 到 PubSub

标签 python google-cloud-storage apache-beam google-cloud-pubsub dataflow

我需要对一些数据执行一个非常简单的转换(从 JSON 中提取一个字符串),然后将它写入 PubSub - 我正在尝试使用自定义 python Dataflow 作业来执行此操作。

我写了一个成功写回 Cloud Storage 的作业,但即使是最简单的写入 PubSub(无转换)的尝试也会导致错误:JOB_MESSAGE_ERROR: Workflow failed. Causes: Expected custom source to have non-zero number of splits.
有没有人通过 Dataflow 从 GCS 成功写入 PubSub?

任何人都可以阐明这里出了什么问题吗?


def run(argv=None):

  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      help='Input file to process.')
  parser.add_argument('--output',
                      dest='output',                      
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:

    lines = p | ReadFromText(known_args.input)

    output = lines #Obviously not necessary but this is where my simple extract goes

    output | beam.io.WriteToPubSub(known_args.output) # This doesn't

最佳答案

当前无法实现此方案,因为当您在 Dataflow 中使用流模式时,the only source you can use is PubSub .并且您无法切换到批处理模式,因为 apache 光束 PubSub sources and sinks仅可用于流式传输(用于像 Dataflow runner 这样的远程执行)。

这就是为什么您可以在没有 WriteToPubSub 和流标志的情况下执行管道的原因。

关于python - Beam/Dataflow 自定义 Python 作业 - Cloud Storage 到 PubSub,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56868887/

相关文章:

Python Mechanize 错误 +

python - 更新 python 中的部分表面或透明表面

google-cloud-storage - 谷歌存储 : how to check if an object exists?

php - 带有服务帐户的 Google Cloud Storage - 403 禁止

python - 使用 apache beam/google 云数据流读取多行 JSON

python - Django Rest Framework ViewSet 不按字段过滤

python - ubuntu 上 python 的应用程序引擎导入本地数据存储

python - 从 GCloud 激活服务帐户

google-cloud-dataflow - 如何在 Apache Beam 中写入多个文件?

python - 在数据流中包含其他文件