我目前正在使用缩放进行mapreduce工作。我试图根据我在类型管道中的行中看到特定值的次数来进行阈值设置。例如,如果我在typedpipe中包含以下行:
第1栏|第2栏
'hi'| '嘿'
'hi'| ho
'hi'| ho
再见|再见
我想将每行中第1列和第2列中看到的值的频率添加到每一行。意思是输出看起来像:
第1栏|第2栏|第1栏的频率|第2栏频率
'hi'| “嘿” | 3 | 1个
'hi'| 'ho'| 3 | 2
'hi'| 'ho'| 3 | 2
再见|再见| 1 | 1个
目前,我是通过按各列对类型化管道进行分组来实现的,如下所示:
val key2Freqs = input.groupBy('key2) {
_.size('key2Freq)
}.rename('key2 -> 'key2Right).project('key2Right, 'key2Freq);
然后像这样将原始输入与key2Freqs连接起来:
.joinWithSmaller('key2 -> 'key2Right, key2Freqs, joiner = new LeftJoin)
但是,这确实很慢,在我看来对于本质上很简单的任务而言效率很低。 b / c变得特别长,我有6个不同的键要为其获取这些值,而我目前在工作中映射并加入了6个不同的时间。必须有更好的方法来执行此操作,对吗?
最佳答案
如果每列中不同值的数量小到足以将它们全部容纳到内存中,则可以将列.map
到Map[String,Int]
中,然后.groupAll.sum
一次性计算所有值(我使用的是“typed api”表示法,请不要不太记得在api字段中是如何完成此操作的,但是您知道了)。您需要使用algebird中的MapMonoid
,或者如果您不想为这件事添加依赖项,则只需编写自己的,这并不难。
然后,您将得到一个管道,其中包含生成的Map
的单个条目。现在,您可以获取原始管道,并执行.crossWithTiny
将带有计数的 map 放入其中,然后使用.map
提取单个计数。
否则,如果您无法将所有内容都保留在内存中,那么您现在正在做的事情似乎是唯一的方法……除非您实际上是在寻找近似的“头号人物”,而不是整个主题的确切数字宇宙...在这种情况下,请查看algebird的SketchMap。
关于scala - 如何在缩放时计算类型管道中行中列的频率?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35351209/