google-cloud-dataflow - 使用 BlockingDataflowPipelineRunner 和 Dataflow 模板的后处理代码

标签 google-cloud-dataflow

我想在管道完成所有处理后运行一些代码,因此我使用 BlockingDataflowPipelineRunner并将代码放在 pipeline.run() 之后在main .

当我使用BlockingDataflowPipelineRunner从命令行运行作业时,这可以正常工作。 。 pipeline.run()下的代码管道完成处理后运行。

但是,当我尝试将作业作为模板运行时,它不起作用。我将该作业部署为模板(使用 TemplatingDataflowPipelineRunner ),然后尝试在云函数中运行该模板,如下所示:

dataflow.projects.templates.create({
    projectId: 'PROJECT ID HERE',
    resource: {
        parameters: {
            runner: 'BlockingDataflowPipelineRunner'
        },
        jobName: `JOB NAME HERE`,
        gcsPath: 'GCS TEMPLATE PATH HERE'
    }
}, function(err, response) {
    if (err) {
        // etc
    }
    callback();
});

运行者似乎没有接受。我可以将乱码放在运行程序下,并且作业仍然运行。

我在pipeline.run()下的代码每个作业运行时不会运行 - 它仅在我部署模板时运行。

pipeline.run()下的代码是否符合预期在main不会在每次作业运行时都运行吗?有没有管道完成后执行代码的解决方案?

(就上下文而言,pipeline.run() 之后的代码将文件从一个 Cloud Storage 存储桶移动到另一个 Cloud Storage 存储桶。它正在归档刚刚由作业处理的文件。)

最佳答案

是的,这是预期的行为。模板代表管道本身,并允许通过启动模板来(重新)执行管道。由于模板不包含 main() 方法中的任何代码,因此它不允许在管道执行后执行任何操作。

同样,dataflow.projects.templates.create API 只是启动模板的 API。

阻塞运行器完成此操作的方法是从创建的管道中获取作业 ID,并定期轮询以观察其何时完成。对于您的用例,您需要执行相同的操作:

  1. 执行dataflow.projects.templates.create(...)以创建Dataflow作业。这应该返回作业 ID。
  2. 定期(例如每 5-10 秒)轮询 dataflow.projects.jobs.get(...) 以检索具有给定 ID 的作业,并检查其处于什么状态。

关于google-cloud-dataflow - 使用 BlockingDataflowPipelineRunner 和 Dataflow 模板的后处理代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44375064/

相关文章:

google-cloud-platform - 优化内存密集型数据流管道的 GCP 成本

python - 如何修复 "AttributeError: ' str' 对象在从 PubSub 读取并写入 BigQuery 的数据流管道中没有属性 'items'

google-bigquery - 通过加载作业(非流式)插入 BigQuery

google-app-engine - 从 Google Dataflow 保存到 Google Datastore

java - 源与 PTransform

Java- Apache 光束 : Read file from GCS with "UCS2-LE BOM" encoding

google-cloud-dataflow - Apache Beam:FlatMap与Map?

google-cloud-dataflow - apache beam 流式传输和同时处理多个文件和窗口连接?

google-cloud-storage - 压缩保存在Google云存储中的文件

google-cloud-dataflow - 优化 Apache Beam/DataFlow 中的重复转换