hadoop - 合并分布式应用程序中的输入

标签 hadoop apache-spark distributed bigdata

简介

我必须编写分布式应用程序,该应用程序计算3条记录的唯一值的最大数量。我没有这方面的经验,也不了解框架。我的输入可能如下所示:

u1: u2,u3,u4,u5,u6
u2: u1,u4,u6,u7,u8
u3: u1,u4,u5,u9
u4: u1,u2,u3,u6
...

那么结果的开始应该是:
(u1,u2,u3), u4,u5,u6,u7,u8,u9 => count=6
(u1,u2,u4), u3,u5,u6,u7,u8    => count=5
(u1,u3,u4), u2,u5,u6,u9       => count=4
(u2,u3,u4), u1,u5,u6,u7,u8,u9 => count=6
...

因此,我的方法是先合并每两个记录,然后再将每对合并的记录与每条记录合并。

问题

我可以像hadoop / spark这样的框架中执行这样的操作,例如在多个输入行上同时工作(合并)吗?还是我的方法不正确,我应该采取其他方式吗?

任何建议将被认真考虑。

最佳答案

Can I do such operation like working (merge) on more than one input row on the same time in framewors like hadoop/spark?



是的你可以。

Or maybe my approach is incorrect and I should do this different way?



这取决于数据的大小。如果您的数据很小,则在本地进行处理会更快,更轻松。如果您的数据量巨大(至少数百GB),通常的策略是将数据保存到HDFS(分布式文件系统)中,并使用Mapreduce / Spark进行分析。

用scala编写的示例spark应用程序:
    object MyCounter {
      val sparkConf = new SparkConf().setAppName("My Counter")
      val sc = new SparkContext(sparkConf)

      def main(args: Array[String]) {
        val inputFile = sc.textFile("hdfs:///inputfile.txt")
        val keys = inputFile.map(line => line.substring(0, 2)) // get "u1" from "u1: u2,u3,u4,u5,u6"

        val triplets = keys.cartesian(keys).cartesian(keys)
          .map(z => (z._1._1, z._1._2, z._2))
          .filter(z => !z._1.equals(z._2) && !z._1.equals(z._3) && !z._2.equals(z._3)) // get "(u1,u2,u3)" triplets

        // If you have small numbers of (u1,u2,u3) triplets, it's better prepare them locally.

        val res = triplets.cartesian(inputFile).filter(z => {
          z._2.startsWith(z._1._1) || z._2.startsWith(z._1._2) || z._2.startsWith(z._1._3)
        }) // (u1,u2,u3) only matches line starts with u1,u2,u3, for example "u1: u2,u3,u4,u5,u6"
          .reduceByKey((a, b) => a + b) // merge three lines
          .map(z => {
          val line = z._2
          val values = line.split(",")
          //count unique values using set
          val set = new util.HashSet[String]()
          for (value <- values) {
            set.add(value)
          }
          "key=" + z._1 + ", count=" + set.size()  // the result from one mapper is a string
        }).collect()

        for (line <- res) {
          println(line)
        }
      }
    }
  • 该代码未经测试。而且效率不高。它可以进行一些优化(例如,删除不必要的map-reduce步骤。)
  • 您可以使用Python / Java重写相同的版本。
  • 您可以使用Hadoop / Mapreduce实现相同的逻辑
  • 关于hadoop - 合并分布式应用程序中的输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37628490/

    相关文章:

    java - 如何将 pdf/images 存储到 HBase 表

    scala - 使用 ftp 在 Apache Spark 中读取远程机器上的文件

    tensorflow - 分布式 Tensorflow 中的数据批处理

    hadoop - 无法在多节点hadoop集群设置中运行datanode,需要建议

    hadoop - Cloudera Manager 有什么替代方案吗? (CDH)

    java - 如何在eclipse中使用java运行嵌入式pig程序?

    java - 将带有换行符的固定长度的文本文件作为属性值之一读入 JavaRDD

    apache-spark - Apache Spark : Yarn logs Analysis

    python - 如何在 dask/distributed 中存储 worker-local 变量

    database - 如何使用数据库服务器进行分布式作业调度?