java - Mapreduce - 排序作业?

标签 java google-app-engine mapreduce

我正在使用 MapReduce(实际上只是映射)分四个阶段执行数据处理任务。每个阶段都是一个 MapReduce 作业。我需要它们按顺序运行,也就是说,在第 1 阶段完成之前不要开始第 2 阶段,等等。有没有人有这样做的经验可以分享?

理想情况下,我们会在一夜之间完成这 4 个作业序列,所以完成它 cron-able 也是一件好事。

谢谢

最佳答案

正如 Daniel 提到的,appengine-pipeline 库旨在解决这个问题。我回顾了将 MapReduce 作业链接在一起 in this blog post ,在“实现您自己的管道作业”部分下。

为了方便起见,我将相关部分粘贴到此处:

现在我们知道如何启动预定义的 MapreducePipeline,让我们看看如何实现和运行我们自己的自定义管道作业。管道库提供了一个低级库,用于在 appengine 中启动任意分布式计算作业,但是现在,我们将具体讨论如何使用它来帮助我们将 MapReduce 作业链接在一起。让我们扩展之前的示例,以输出字符和 ID 的反向索引。

首先,我们定义父管道作业。

class ChainMapReducePipeline(mapreduce.base_handler.PipelineBase):
def run(self):
    deduped_blob_key = (
    yield mapreduce.mapreduce_pipeline.MapreducePipeline(
        "test_combiner",
        "main.map",
        "main.reduce",
        "mapreduce.input_readers.RandomStringInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        combiner_spec="main.combine",
        mapper_params={
            "string_length": 1,
            "count": 500,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=16))

    char_to_id_index_blob_key = (
    yield mapreduce.mapreduce_pipeline.MapreducePipeline(
        "test_chain",
        "main.map2",
        "main.reduce2",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        # Pass output from first job as input to second job
        mapper_params=(yield BlobKeys(deduped_blob_key)),
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=4))

这将启动与第一个示例相同的作业,获取该作业的输出,并将其输入到第二个作业中,从而反转每个条目。请注意,第一个管道产量的结果被传递到第二个作业的mapper_params。管道库使用魔法来检测第二个管道是否依赖于第一个管道的完成,并且在 deduped_blob_key 解析完成之前不会启动它。

接下来,我必须创建 BlobKeys 帮助器类。起初,我认为这没有必要,因为我可以这样做:

mapper_params={"blob_keys": deduped_blob_key},

但是,由于两个原因,这不起作用。第一个是“生成器管道无法直接访问它生成的子管道的输出”。上面的代码需要生成器管道使用第一个作业的输出创建一个临时字典对象,这是不允许的。第二个是 BlobstoreOutputWriter 返回的字符串格式为“/blobstore/”,但 BlobstoreLineInputReader 需要简单的“”。为了解决这些问题,我做了一个小助手 BlobKeys 类。您会发现自己在许多工作中都这样做,管道库甚至包括一组通用包装器,但它们不能在 MapreducePipeline 框架内工作,我将在本节底部讨论该框架。

class BlobKeys(third_party.mapreduce.base_handler.PipelineBase):
  """Returns a dictionary with the supplied keyword arguments."""

  def run(self, keys):
    # Remove the key from a string in this format:
    # /blobstore/<key>
    return {
        "blob_keys": [k.split("/")[-1] for k in keys]
    }

这是map2和reduce2函数的代码:

def map2(data):
    # BlobstoreLineInputReader.next() returns a tuple
    start_position, line = data
    # Split input based on previous reduce() output format
    elements = line.split(" - ")
    random_id = elements[0]
    char = elements[1]
    # Swap 'em
    yield (char, random_id)

def reduce2(key, values):
    # Create the reverse index entry
    yield "%s - %s\n" % (key, ",".join(values))

关于java - Mapreduce - 排序作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19501519/

相关文章:

java - Streamtokenizer 读取非常大的数字?

google-app-engine - 谷歌应用引擎 : No key_name attribute

Sun Grid Engine 上的 Python MapReduce

java - 带有可能的单引号的 ExecuteUpdate

Java:通过继承类实现接口(interface)

python - 如何在调用 get() 和 post() 之前做一些代码?

node.js - 如何以及从哪里开始使用 Google App Engine 编写应用程序?

mysql - Sqoop 导出到 MySQL 导出作业失败工具.ExportTool 但得到记录

hadoop - 为什么 DISTINCT 在 Pig 中比 GROUP BY/FOREACH 快

java - 在 Micronaut 中有使用@PostConstruct 的正确方法吗?