python - 如何优雅地中止 App Engine 管道?

标签 python google-app-engine mapreduce

问题

我有一系列管道:

class PipelineA(base_handler.PipelineBase):
  def run(self, *args):
    # do something

class PipelineB(base_handler.PipelineBase):
  def run(self, *args):
    # do something


class EntryPipeline(base_handler.PipelineBase):
  def run(self):

    if some_condition():
      self.abort("Condition failed. Pipeline aborted!")

    yield PipelineA()

    mr_output = yield mapreduce_pipeline.MapreducePipeline(
      # mapreduce configs here
      # ...
    )

    yield PipelineB(mr_output)

p = EntryPipeline()
p.start()

EntryPipeline 中,我在启动 PipelineAMapreducePipelinePipelineB 之前测试了一些条件。如果条件失败,我想中止 EntryPipeline 和所有后续管道。

问题

  1. 什么是优雅的管道中止? self.abort() 是正确的方法还是我需要 sys.exit()

  2. 如果我想在PipelineA内进行堕胎怎么办?例如PipelineA 成功启动,但阻止后续管道(MapreducePipelinePipelineB)启动。

<小时/>

编辑:

我最终将条件语句移至 EntryPipeline 之外,因此仅当条件为真时才启动整个过程。否则我认为尼克的答案是正确的。

最佳答案

由于文档当前说“TODO:谈论显式中止和重试

我们必须阅读源代码:

https://github.com/GoogleCloudPlatform/appengine-pipelines/blob/master/python/src/pipeline/pipeline.py#L703

  def abort(self, abort_message=''):
    """Mark the entire pipeline up to the root as aborted.
    Note this should only be called from *outside* the context of a running
    pipeline. Synchronous and generator pipelines should raise the 'Abort'
    exception to cause this behavior during execution.

    Args:
      abort_message: Optional message explaining why the abort happened.

    Returns:
      True if the abort signal was sent successfully; False if the pipeline
      could not be aborted for any reason.
    """

因此,如果您拥有 some_pipeline 的句柄,但不是 self,则可以调用 some_pipeline.abort()...但是如果你想中止自己,你需要引发 Abort() ...这将冒泡到顶部并杀死整棵树

关于python - 如何优雅地中止 App Engine 管道?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29979036/

相关文章:

hadoop - 按 pig 中的相同值对数据包进行分组

hadoop - Hive MAPJOIN作业将多少数据视为 “too large”?

python - Django - 如何在表单中添加自定义错误消息?

python - 如何创建正则表达式来替换括号中的字符串?

java - gcloud : Default Application Credentials in appengine for Cloud Storage (java)

python - 在 python 中使用 IOT 适配器和 google pub/sub api 将 MQTT 与 GCP 集成

python - 谷歌应用引擎计数

python - AWS Boto/Warrant 库 : SRP authentication and credentials error

python - 经过训练的机器学习模型太大

hadoop - 我基于Hadoop采访场景的查询-解决方案可以在HIVE/PIG/MapReduce中