python - GAE Python 上的 Mapreduce - 导致 ReducePipeline 在最终确定时发出回调?

标签 python google-app-engine mapreduce

我想在 MapReduce 作业完成/完成后执行自定义回调函数。

我找到的关于此问题的唯一有用的引用是 a somewhat outdated Google site和一个相关的,但又似乎过时的Stackoverflow question .

这两个来源都假设我使用 control.start_map 来启动 Mapreduce 作业,并依赖于 start_map 采用关键字参数 mapreduce_parameters 这一事实code> 其中可以指定一个 done_callback 参数来指定完成时应调用的 url。但是,我使用的是一种不同的方法(据我所知,是最近的首选方法),其中自定义管道的 run 方法生成 Mapreduce 管道:

yield mapreduce_pipeline.MapreducePipeline(
    "word_count",
    "main.word_count_map",
    "main.word_count_reduce",
    "mapreduce.input_readers.BlobstoreZipInputReader",
    "mapreduce.output_writers.BlobstoreOutputWriter",
    mapper_params={
        "blob_key": blobkey,
    },
    reducer_params={
        "mime_type": "text/plain",
    },
    shards=16)

MapreducePipeline 的签名不允许使用 mapreduce_parameters 参数。我唯一可以看到源代码中出现的回调引用的地方是 mapper_pipeline.MapperPipeline.run,但它似乎仅在内部使用。

那么,有没有办法获取其中的回调参数?

如果没有,是否有人对在何处以及如何扩展库以提供此类功能有好的想法?

最佳答案

我将 Mapreduce 管道范例设置为如下所示:

class MRRecalculateSupportsPipeline(base_handler.PipelineBase):

    def run(self, user_key):
        # ...
        yield mapreduce_pipeline.MapreducePipeline('user_recalculate_supports',
                'myapp.mapreduces.user_recalculate_supports_map',
                'myapp.mapreduces.user_recalculate_supports_reduce',
                'mapreduce.input_readers.DatastoreInputReader', output_writer_spec=None,
                mapper_params={"""..."""})

如果您想捕获此管道的完成情况,您有两种选择。

A) 使用 pipeline.After 在 MR 管道完成后运行完成管道。

        pipe_future = yield mapreduce_pipeline.MapreducePipeline('user_recalculate_supports',
                'myapp.mapreduces.user_recalculate_supports_map',
                'myapp.mapreduces.user_recalculate_supports_reduce',
                'mapreduce.input_readers.DatastoreInputReader', output_writer_spec=None,
                mapper_params={"""..."""})
        with pipeline.After(pipe_future):
            yield CalcCompletePipeline(...)  # this could be a mapreduce pipeline, or any pipeline using the same base_handler.PipelineBase parent class.

B) 使用顶级管道的finalized 方法来处理完成。就我个人而言,我会坚持使用选项 A,因为您可以在 /_ah/*/status?root= View 中跟踪路径。

class EmailNewReleasePipeline(base_handler.PipelineBase):
    """Email followers about a new release"""
    # TODO: product_key is the name of the parameter, but it's built for albums ...

    def run(self, product_key, testing=False):
            # Send those emails ...
            yield mapreduce_pipeline.MapreducePipeline(...)

    def finalized(self):
        """Save product as launched"""
        ...
        product.launched = True
        product.put()

以下是 finalization of a pipeline 上的文档.

关于python - GAE Python 上的 Mapreduce - 导致 ReducePipeline 在最终确定时发出回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20688593/

相关文章:

java - Hadoop CustomInputFormat NullPointerException

python - 如何根据条件动态构建查询?

python - 如何通过python编辑文件夹中所有文本文件的特定行?

java - App Engine - 打开的实例太多

google-app-engine - 在 AppEngine/Go 中获取当前部署的时间戳

python - 如何从代码中检查当前实例的内存使用情况

hadoop - 如何使用Hadoop MapReduce将数据从AWS S3导入HDFS

hadoop - 为 AvroParquetInputFormat 设置多个 projectionSchemas 的问题

python - 如何重定向到 Flask 中的外部域?

python - 创建自定义响应消息时出错