mapreduce - Spark - 减少耗时过长的操作

标签 mapreduce apache-spark text-mining tf-idf

我正在使用 Spark 制作一个应用程序,它将运行一些主题提取算法。为此,首先我需要进行一些预处理,最后提取文档术语矩阵。我可以做到这一点,但对于(不是那么多)大文档集合(只有 2000,5MB),这个过程会花费很长时间。

因此,在调试时,我发现程序有点卡住的地方,并且它处于归约操作中。我在这部分代码中所做的就是计算每个术语在集合中出现的次数,所以首先我做了一个“映射”,为每个 rdd 计算它,然后我“减少”它,将结果保存在里面 HashMap 。 Map操作非常快,但是在Reduce中,它把操作分成40个 block ,每个 block 需要5~10分钟处理。

所以我试图找出我做错了什么,或者减少操作的成本是否那么高。

SparkConf:独立模式,使用本地[2]。我尝试将其用作“spark://master:7077”,并且它有效,但仍然很慢。

代码:

“filesIn”是一个 JavaPairRDD,其中键是文件路径,值是文件内容。 因此,首先是 map ,我在其中获取“filesIn”,分割单词,并计算它们的频率(在这种情况下,文件是什么并不重要) 然后是reduce,我在其中创建了一个HashMap(term、freq)。

JavaRDD<HashMap<String, Integer>> termDF_ = filesIn.map(new Function<Tuple2<String, String>, HashMap<String, Integer>>() {

        @Override
        public HashMap<String, Integer> call(Tuple2<String, String> t) throws Exception {
            String[] allWords = t._2.split(" ");

            HashMap<String, Double> hashTermFreq = new HashMap<String, Double>();
            ArrayList<String> words = new ArrayList<String>();
            ArrayList<String> terms = new ArrayList<String>();
            HashMap<String, Integer> termDF = new HashMap<String, Integer>();

            for (String term : allWords) {

                if (hashTermFreq.containsKey(term)) {
                    Double freq = hashTermFreq.get(term);
                    hashTermFreq.put(term, freq + 1);
                } else {
                    if (term.length() > 1) {
                        hashTermFreq.put(term, 1.0);
                        if (!terms.contains(term)) {
                            terms.add(term);
                        }
                        if (!words.contains(term)) {
                            words.add(term);
                            if (termDF.containsKey(term)) {
                                int value = termDF.get(term);
                                value++;
                                termDF.put(term, value);
                            } else {
                                termDF.put(term, 1);
                            }
                        }
                    }
                }
            }
            return termDF;
        }
    });

 HashMap<String, Integer> termDF = termDF_.reduce(new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>>() {

        @Override
        public HashMap<String, Integer> call(HashMap<String, Integer> t1, HashMap<String, Integer> t2) throws Exception {
            HashMap<String, Integer> result = new HashMap<String, Integer>();

            Iterator iterator = t1.keySet().iterator();

            while (iterator.hasNext()) {
                String key = (String) iterator.next();
                if (result.containsKey(key) == false) {
                    result.put(key, t1.get(key));
                } else {
                    result.put(key, result.get(key) + 1);
                }

            }

            iterator = t2.keySet().iterator();

            while (iterator.hasNext()) {
                String key = (String) iterator.next();
                if (result.containsKey(key) == false) {
                    result.put(key, t2.get(key));
                } else {
                    result.put(key, result.get(key) + 1);
                }

            }

            return result;
        }
    });

谢谢!

最佳答案

好的,所以就在我的脑海中:

  • Spark 转换是惰性的。这意味着 map 在您调用后续 reduce 操作之前不会执行,因此您所描述的缓慢 reduce 很可能是缓慢的 map + 减少
  • ArrayList.contains 的复杂度为 O(N),因此所有这些 words.containsterms.contains 效率极低
  • map 逻辑有点可疑。尤其:
    • 如果术语已经出现,您永远不会进入else分支
    • 乍一看,wordsterms 应该具有完全相同的内容,并且应该等同于 hashTermFreq 键或 termDF 键。
    • 看起来 termDF 中的值只能取值 1。如果这就是您想要的并且忽略频率,那么创建 hashTermFreq 的意义何在?
  • 这里实现的
  • reduce阶段意味着低效的线性扫描,数据上的对象不断增长,而您真正想要的是reduceByKey

使用 Scala 作为伪代码,您的整个代码可以有效地表达如下:

val termDF = filesIn.flatMap{
  case (_, text) => 
    text.split(" ") // Split
    .toSet // Take unique terms 
    .filter(_.size > 1) // Remove single characters
    .map(term => (term, 1))} // map to pairs
  .reduceByKey(_ + _) // Reduce by key

termDF.collectAsMap // Optionally

最后看来你是在重新发明轮子。至少您需要的一些工具已经在mllib.feature中实现了。或ml.feature

关于mapreduce - Spark - 减少耗时过长的操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33558593/

相关文章:

hadoop - 什么是适合我索引和处理大数据的工具?

python - 如何从 PySpark DataFrame 中获取随机行?

machine-learning - 文档聚类的对数似然相似度

machine-learning - 词频特征归一化

hadoop - TaskTracker使用本地库

hadoop - 如何刷新Hadoop分布式缓存?

mongodb - 如何加速MongoDB的MapReduce?

java - 任务不可序列化 - Java 1.8 和 Spark 2.1.1

scala - 为什么 mapPartitions 不向标准输出打印任何内容?

r - 比较数据帧中的单词并计算每对最大单词长度的矩阵