google-app-engine - 如何动态传递参数以映射 GAE mapreduce 上的函数?

标签 google-app-engine mapreduce

我需要运行一个动态的 mapreduce 作业,因为每次运行 mapreduce 作业时都需要将参数传递给 map 和 reduce 函数(例如,响应用户请求)。

我该如何实现?我在文档中的任何地方都看不到如何在运行时对 map 和 reduce 进行动态处理。

class MatchProcessing(webapp2.RequestHandler):

  def get(self):
      requestKeyID=int(self.request.get('riderbeeRequestID'))
      userKey=self.request.get('userKey')
      pipeline = MatchingPipeline(requestKeyID, userKey)
      pipeline.start()
      self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)


class MatchingPipeline(base_handler.PipelineBase):
    def run(self, requestKeyID, userKey):
        yield mapreduce_pipeline.MapreducePipeline(
            "riderbee_matching",
            "tasks.matchingMR.riderbee_map",
            "tasks.matchingMR.riderbee_reduce",
            "mapreduce.input_readers.DatastoreInputReader",
            "mapreduce.output_writers.BlobstoreOutputWriter",
            mapper_params={
                "entity_kind": "models.rides.RiderbeeRequest",
                "requestKeyID": requestKeyID,
                "userKey": userKey,
            },
            reducer_params={
                "mime_type": "text/plain",
            },
            shards=16)


def riderbee_map(riderbeeRequest):
    # would like to access the requestKeyID and userKey parameters that were passed in mapper_params
    # so that we can do some processing based on that

    yield (riderbeeRequest.user.email, riderbeeRequest.key().id())


def riderbee_reduce(key, values):
    # would like to access the requestKeyID and userKey parameters that were passed earlier, perhaps through reducer_params
    # so that we can do some processing based on that

    yield "%s: %s\n" % (key, len(values))

请帮忙?

最佳答案

我很确定您可以在 mapper_parameters 中指定参数,然后从上下文模块中读取它们。参见 http://code.google.com/p/appengine-mapreduce/wiki/UserGuidePython#Mapper_parameters了解更多详情。

关于google-app-engine - 如何动态传递参数以映射 GAE mapreduce 上的函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11269381/

相关文章:

google-app-engine - 替代 Google Cloud Function 的替代方法

google-app-engine - 谷歌云采取什么措施来保护实例免受 IP 欺骗?

java - 如何在端点类上实现自定义 @JsonDeserialize 方法?

hadoop - hadoop map-reduce作业因初始化失败而崩溃:java.io.IOException:拆分元数据大小超过10000000。

hadoop - 如何在hadoop YARN多节点群集上配置内存和vcore?

java - Hadoop 配置文件输出 - 在哪里和什么?

java - 我可以在 GAE 或 GCE 上调用 Java 进程吗?

java - facebook 登录 服务器端应用程序与 javascript sdk

python-2.7 - 如何使用map()将(键,值)对转换为仅在Pyspark中的值

c# - Mongodb C# 驱动程序按嵌套属性排序