java - 通过Java和MapReduce构造文档项矩阵

标签 java hadoop mapreduce

背景:

我正在尝试使用MapReduce在Hadoop上的Java中创建一个“文档术语”矩阵。文档术语矩阵就像一个巨大的表,其中每一行代表一个文档,每一列代表一个可能的单词/术语。

问题陈述:

假设我已经有一个术语索引列表(这样我就知道哪个术语与哪个列号相关联),那么在每个文档中查找每个术语的索引的最佳方法是什么,以便我可以逐行构建矩阵行(即逐个文档)?

到目前为止,我可以想到两种方法:

方法1:

将术语索引列表存储在Hadoop分布式文件系统上。映射器每次读取新文档进行索引时,都会生成一个新的MapReduce作业-该文档中每个唯一单词的作业,其中每个作业都在分布式术语列表中查询其术语。这种方法听起来有些矫kill过正,因为我猜测开始新工作会带来一些开销,并且这种方法可能需要数以千万计的工作。另外,我不确定是否可以在另一个MapReduce作业中调用MapReduce作业。

方法2:

将术语索引列表追加到每个文档,以便每个映射器以术语索引列表的本地副本结尾。这种方法在存储上非常浪费(术语索引列表的副本与文档一样多)。另外,我不确定如何将术语索引列表与每个文档合并-我将它们合并在映射器中还是化简器中?

问题更新1

输入文件格式:

输入文件将是包含所有文档(产品评论)的CSV(逗号分隔值)文件。文件中没有列标题,但是每个评论的值按以下顺序显示:product_id,review_id,评论,星号。下面是一个伪造的示例:

“Product A”, “1”,“Product A is very, very expensive.”,”2”

“Product G”, ”2”, “Awesome product!!”, “5”



术语索引文件格式:

术语索引文件中的每一行包含以下内容:索引号,制表符和单词。每个可能的单词在索引文件中仅列出一次,因此术语索引文件类似于SQL表的主键(单词)列表。对于特定文档中的每个单词,我的暂定计划是遍历术语索引文件的每一行,直到找到该单词为止。然后将该单词的列号定义为与该单词关联的列/术语索引。以下是术语索引文件的示例,该文件是使用前面提到的两个示例产品评论构建的。

1 awesome

2 product

3 a

4 is

5 very

6 expensive



输出文件格式:

我希望输出为“矩阵市场”(MM)格式,这是用于压缩具有多个零的矩阵的行业标准。这是理想的格式,因为大多数审阅将只包含所有可能单词中的一小部分,因此对于特定文档,只需指定非零列即可。

MM格式的第一行具有三个制表符分隔的值:文档总数,单词列总数和MM文件中的总行数(不包括标题)。在标题之后,每一行都包含与特定条目关联的矩阵坐标,以及条目的值,其顺序为:reviewID,wordColumnID,条目(此单词在评论中出现多少次)。有关Matrix Market格式的更多详细信息,请参见以下链接:http://math.nist.gov/MatrixMarket/formats.html

每个评论的ID将等于其在文档术语矩阵中的行索引。这样,我可以将评论的ID保留为Matrix Market格式,以便仍可以将每个评论与其星级进行关联。我的最终目标(超出此问题的范围)是建立一种自然语言处理算法,以根据其文字预测新评论中的星星数。

使用上面的示例,最终的输出文件将如下所示(我无法让Stackoverflow显示选项卡而不是空格):

2 6 7

1 2 1

1 3 1

1 4 1

1 5 2

1 6 1

2 1 1

2 2 1

最佳答案

好吧,您可以使用类似于倒排索引概念的东西。

我建议这样做是因为,我假设两个文件都很大。因此,像一对一地相互比较将是真正的性能瓶颈。

这是一种可以使用的方式-

您可以将输入文件格式的csv文件(例如datafile1,datafile2)和术语索引文件(例如term_index_file)作为工作的输入。

然后在每个映射器中,过滤源文件名,如下所示-

映射器的伪代码-

map(key, row, context){
  String filename= ((FileSplit)context.getInputSplit()).getPath().getName();
   if (filename.startsWith("datafile") {

     //split the review_id, words from row
     ....
     context.write(new Text("word), new Text("-1 | review_id"));

   } else if(filename.startsWith("term_index_file") {
     //split index and word
     ....
     context.write(new Text("word"), new Text("index | 0"));
   }

}

,例如来自不同映射器的输出
Key       Value      source
product   -1|1       datafile
very       5|0       term_index_file
very      -1|1       datafile
product   -1|2       datafile
very      -1|1       datafile
product    2|0       term_index_file
...
...

说明(示例):
正如它清楚显示的那样,键将是您的单词,值将由两部分组成,并由定界符“|”分隔

如果源是数据文件,则发出key = product和value = -1 | 1,其中-1是虚拟元素,而1是review_id。

如果源是term_index_file,则发出key = product和value = 2 | 0,其中2是单词'product'的索引,而0是虚拟的review_id,我们将使用它进行排序-稍后将进行说明。

当然,如果我们将term_index_file作为作业的常规输入文件提供,则两个不同的映射器将不会处理重复的索引。
因此,'product,different'或term_index_file中的任何其他索引词将仅对一个映射器可用。注意,这仅对term_index_file有效,对数据文件无效。

下一步:

您可能知道,Hadoop mapreduce框架将按键分组
因此,您将获得类似的内容去往不同的 reducer ,
reduce-1: key=product, value=<-1|1, -1|2, 2|0>
reduce-2: key=very, value=<5|0, -1|1, -1|1>

但是,在上述情况下,我们有一个问题。我们希望对'|'之后的值进行排序即在reduce-1 -> 2|0, -1|1, -1|2 and in reduce-2 -> <5|0, -1|1, -1|1>

为此,您可以使用通过排序比较器实现的辅助排序。请为此搜索google,但here's a link可能会有所帮助。在这里提到它可能会花费很长时间。

在每个reduce-1中,由于值按上述顺序排序,因此当我们开始迭代时,我们将在第一个迭代中获得'0',并使用 index_id = 2 ,然后可以将其用于后续迭代。在接下来的两次迭代中,我们连续获取评论ID 1和2,并使用一个计数器,这样我们就可以跟踪任何重复的评论ID。当我们获得重复的评论ID时,这意味着一个单词在同一review_id行中出现了两次。仅当找到不同的review_id并针对特定index_id发出先前的review_id详细信息时,我们才会重置计数器,如下所示-
previous_review_id  + "\t" + index_id + "\t" + count

当循环结束时,我们将剩下一个previous_review_id,我们最终将以相同的方式发出它。

reducer 的伪代码-
reduce(key, Iterable values, context) {
  String index_id = null;
  count = 1;
  String previousReview_id = null;
  for(value: values) {
      Split split[] = values.split("\\|");
      ....

      //when consecutive review_ids are same, we increment count
      //and as soon as the review_id differ, we emit, reset the counter and print
      //the previous review_id detected.
      if (split[0].equals("-1") && split[1].equals(previousReview_id)) {
          count++;
      } else if(split[0].equals("-1") && !split[1].equals(prevValue)) {
          context.write(previousReview_id + "\t" + index_id + "\t" + count);
          previousReview_id = split[1];//resting with new review_id id
          count=1;//resetting count for new review_id
      } else {
         index_id = split[0]; 
      }
  }
  //the last  previousReview_id will be left out, 
  //so, writing it now after the loop  completion
  context.write(previousReview_id + "\t" + index_id + "\t" + count);

}

这项工作由多个reducer完成,以利用Hadoop以其最著名的性能-结果,最终输出将分散,类似于以下内容,与您期望的输出有所不同。
1 4 1
2 1 1
1 5 2
1 2 1
1 3 1
1 6 1
2 2 1

但是,如果您希望所有内容都根据review_id(作为所需的输出)进行排序,则可以编写另一个作业,该作业将使用单个reducer 和previos作业的输出作为输入。并同时计算 2 6 7 并将其放在输出的前面。

我认为,这只是一种方法(或想法),可能会对您有所帮助。您肯定想修改它,提出一种更好的算法,并以您认为会对自己有利的方式使用它。

与使用分隔符(例如“|”)相比,您还可以使用Composite键以获得更好的清晰度。

我愿意澄清。请问您是否认为,这可能对您有用。

谢谢!

关于java - 通过Java和MapReduce构造文档项矩阵,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18301478/

相关文章:

java - 生成一个范围内的随机数。无溢出。 java 语

java - 多色 JLabel

java - 如何为 feign bean 字段设置 JsonProperty 名称

hadoop - Zookeer是hadoop的一部分还是单独配置?

hadoop - Hadoop 中的并行 Map Reduce 作业

java - 以不同方式呈现信息的模式

java - 已达到Oozie Kill Node,但作业状态仍在运行

hadoop - 如何从 apache Drill 中查询 hdfs 零件文件

hadoop - 在同一集群中并置HBase和MapReduce进程

mapreduce - Hive 中的动态分区