我想在管道完成所有处理后运行一些代码,因此我使用 BlockingDataflowPipelineRunner
并将代码放在 pipeline.run()
之后在main
.
当我使用BlockingDataflowPipelineRunner
从命令行运行作业时,这可以正常工作。 。 pipeline.run()
下的代码管道完成处理后运行。
但是,当我尝试将作业作为模板运行时,它不起作用。我将该作业部署为模板(使用 TemplatingDataflowPipelineRunner
),然后尝试在云函数中运行该模板,如下所示:
dataflow.projects.templates.create({
projectId: 'PROJECT ID HERE',
resource: {
parameters: {
runner: 'BlockingDataflowPipelineRunner'
},
jobName: `JOB NAME HERE`,
gcsPath: 'GCS TEMPLATE PATH HERE'
}
}, function(err, response) {
if (err) {
// etc
}
callback();
});
运行者似乎没有接受。我可以将乱码放在运行程序下,并且作业仍然运行。
我在pipeline.run()
下的代码每个作业运行时不会运行 - 它仅在我部署模板时运行。
pipeline.run()
下的代码是否符合预期在main
不会在每次作业运行时都运行吗?有没有管道完成后执行代码的解决方案?
(就上下文而言,pipeline.run()
之后的代码将文件从一个 Cloud Storage 存储桶移动到另一个 Cloud Storage 存储桶。它正在归档刚刚由作业处理的文件。)
最佳答案
是的,这是预期的行为。模板代表管道本身,并允许通过启动模板来(重新)执行管道。由于模板不包含 main()
方法中的任何代码,因此它不允许在管道执行后执行任何操作。
同样,dataflow.projects.templates.create
API 只是启动模板的 API。
阻塞运行器完成此操作的方法是从创建的管道中获取作业 ID,并定期轮询以观察其何时完成。对于您的用例,您需要执行相同的操作:
- 执行
dataflow.projects.templates.create(...)
以创建Dataflow作业。这应该返回作业 ID。 - 定期(例如每 5-10 秒)轮询
dataflow.projects.jobs.get(...)
以检索具有给定 ID 的作业,并检查其处于什么状态。
关于google-cloud-dataflow - 使用 BlockingDataflowPipelineRunner 和 Dataflow 模板的后处理代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44375064/