hadoop - 如何在 Spark 中处理 Map<Key,value>?

标签 hadoop dictionary apache-spark bigdata

我是 Spark 编程的新手,我试图找出一个字符串在文件中针对某个键出现的次数。
这是我的输入:

-------------
2017-04-13 15:56:57.147::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::5::0::
2017-04-13 15:57:01.008::ProductSelectPanel::1599::PRODUCT_SALE_WITH_BARCODE::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::4::1::1013065197
2017-04-13 15:57:09.182::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
2017-04-13 15:57:15.153::ProductSelectPanel::1121::NO_STOCK_PRODUCT::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0::0::
2017-04-13 15:57:19.696::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
2017-04-13 15:57:23.190::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CALP0005::CALPOL 500MG TAB::110::0::
2017-04-13 15:56:57.147::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::5::0::
2017-04-13 15:57:01.008::ProductSelectPanel::1599::PRODUCT_SALE_WITH_BARCODE::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::4::1::1013065197
2017-04-13 15:57:09.182::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
2017-04-13 15:57:15.153::ProductSelectPanel::1121::NO_STOCK_PRODUCT::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0::0::
2017-04-13 15:57:19.696::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
2017-04-13 15:57:23.190::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CALP0005::CALPOL 500MG TAB::110::0::
2017-04-13 15:56:57.147::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::5::0::
2017-04-13 15:57:01.008::ProductSelectPanel::1599::PRODUCT_SALE_WITH_BARCODE::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::4::1::1013065197
2017-04-13 15:57:09.182::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
.......

我的 Spark 程序是这样的。
final Function<String, List<String>> LINE_MAPPER=new Function<String, List<String>>() {

            @Override
            public List<String> call(String line) throws Exception {
                String[] lineArary=line.split("::");
                return Arrays.asList(lineArary[3],lineArary[6]);
            }
        };
        final PairFunction<String, String, Integer> word_paper=new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {

                return new Tuple2<String, Integer>(word, Integer.valueOf(1));
            }
        };
        JavaRDD<List<String>> javaRDD =lineRDD.map(LINE_MAPPER);


After doing map transformation  i am getting like this:

[[PRODUCT_SALE_ENTRY,CROC0008],[NO_STOCK_PRODUCT,CROC0005],[NO_STOCK_PRODUCT,CROC0005],[PRODUCT_SALE_WITH_BARCODE,CROC0008],[PRODUCT_SALE_WITH_BARCODE,CROC0005],[PRODUCT_SALE_WITH_BARCODE,CROC003],....]

but i want the result like..
[[NO_STOCK_PRODUCT,[CROC0005,4]],[PRODUCT_SALE_WITH_BARCODE,[CROC0008,2]],[PRODUCT_SALE_WITH_BARCODE,[CROC0005,1]],....]

请帮我。
提前致谢。

最佳答案

看起来您需要将每个键+字符串对视为一个复合键,并计算该复合键的出现次数。

你可以使用 countByValue() 来做这样的事情。 (见 JavaDoc )。但是,正如文档所述:

Note that this method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _)...



所以,只需 map您的每个值(例如 [PRODUCT_SALE_ENTRY,CROC0008] 到一对形式 ((PRODUCT_SALE_ENTRY,CROC0008), 1L),然后是 reduceByKey()(例如 here )。

我只在 Scala 中做过这个,而不是 Java - 我想你可能需要使用 mapToPair()例如如图here .这将给出以下形式的 RDD:
((NO_STOCK_PRODUCT,CROC0005), 4),
((PRODUCT_SALE_WITH_BARCODE,CROC0008), 2),
((PRODUCT_SALE_WITH_BARCODE,CROC0005), 1),
...

这接近你的要求。

关于hadoop - 如何在 Spark 中处理 Map<Key,value>?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43441722/

相关文章:

hadoop - 通过 '1 Click Installer'更新Community Edition中的各个CDH组件

Python:将函数输出关联到字符串,然后组合成字典

apache-spark - spark parquet 读写中的类型更改支持

hadoop - 我正在运行我的第一个MapReduce程序,输出文件似乎存在一些权限问题

scala - 使用 Apache Spark 写入 HDFS 时的输出序列

hadoop - HFileOutputFormat2.configureIncrementalLoad 与 HBASE 中的 HFileOutputFormat.configureIncrementalLoad 之间的区别

python - 我怎样才能遍历这个字典而不是对键进行硬编码

java - 在 Java 8 中将 Map 连接到 String 的最优雅方式

scala - 从RDD获取值

scala - 如何在带有循环的spark中创建when表达式