例如,我们有一个包含过去 3 年 2000 个股票代码收盘价的拼花文件,我们要计算每个代码的 5 天移动平均线。
所以我创建了一个 spark SQLContext 然后
val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
要获取符号列表,
val symbols = marketData.select("SYMBOL").distinct().collect()
这是 for 循环:
for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}
显然,在 spark 上执行 for 循环很慢,而且
save()
对于每个小的结果也会减慢进程(我已经尝试在 for 循环之外定义一个 var result
并将所有输出联合起来进行 IO 操作,但是我遇到了 stackoverflow 异常),那么我如何并行化 for 循环并优化IO操作?
最佳答案
您编写的程序在驱动程序(“主”) Spark 节点中运行。如果您在并行结构 (RDD) 上运行,则此程序中的表达式只能并行化。
尝试这个:
marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg) }.foreach{ case (symbol,averages) => averages.save() }
哪里
symbolize
接受一行符号 x 天并返回一个元组(符号,天)。
关于scala - 如何使用scala并行化spark中的for循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37005672/