我需要运行一个动态的 mapreduce 作业,因为每次运行 mapreduce 作业时都需要将参数传递给 map 和 reduce 函数(例如,响应用户请求)。
我该如何实现?我在文档中的任何地方都看不到如何在运行时对 map 和 reduce 进行动态处理。
class MatchProcessing(webapp2.RequestHandler):
def get(self):
requestKeyID=int(self.request.get('riderbeeRequestID'))
userKey=self.request.get('userKey')
pipeline = MatchingPipeline(requestKeyID, userKey)
pipeline.start()
self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
class MatchingPipeline(base_handler.PipelineBase):
def run(self, requestKeyID, userKey):
yield mapreduce_pipeline.MapreducePipeline(
"riderbee_matching",
"tasks.matchingMR.riderbee_map",
"tasks.matchingMR.riderbee_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"entity_kind": "models.rides.RiderbeeRequest",
"requestKeyID": requestKeyID,
"userKey": userKey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16)
def riderbee_map(riderbeeRequest):
# would like to access the requestKeyID and userKey parameters that were passed in mapper_params
# so that we can do some processing based on that
yield (riderbeeRequest.user.email, riderbeeRequest.key().id())
def riderbee_reduce(key, values):
# would like to access the requestKeyID and userKey parameters that were passed earlier, perhaps through reducer_params
# so that we can do some processing based on that
yield "%s: %s\n" % (key, len(values))
请帮忙?
最佳答案
我很确定您可以在 mapper_parameters 中指定参数,然后从上下文模块中读取它们。参见 http://code.google.com/p/appengine-mapreduce/wiki/UserGuidePython#Mapper_parameters了解更多详情。
关于google-app-engine - 如何动态传递参数以映射 GAE mapreduce 上的函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11269381/