scala - 如何使用scala并行化spark中的for循环?

标签 scala apache-spark apache-spark-sql spark-dataframe

例如,我们有一个包含过去 3 年 2000 个股票代码收盘价的拼花文件,我们要计算每个代码的 5 天移动平均线。

所以我创建了一个 spark SQLContext 然后

val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()

要获取符号列表,
val symbols = marketData.select("SYMBOL").distinct().collect()

这是 for 循环:
for (symbol <- symbols) {
  marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}

显然,在 spark 上执行 for 循环很慢,而且 save()对于每个小的结果也会减慢进程(我已经尝试在 for 循环之外定义一个 var result 并将所有输出联合起来进行 IO 操作,但是我遇到了 stackoverflow 异常),那么我如何并行化 for 循环并优化IO操作?

最佳答案

您编写的程序在驱动程序(“主”) Spark 节点中运行。如果您在并行结构 (RDD) 上运行,则此程序中的表达式只能并行化。

尝试这个:

marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg)  }.foreach{ case (symbol,averages) => averages.save() }

哪里symbolize接受一行符号 x 天并返回一个元组(符号,天)。

关于scala - 如何使用scala并行化spark中的for循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37005672/

相关文章:

scala - 为什么在 Scala 中不推荐 while 循环

python - PySpark 1.5 如何将时间戳从秒截断到最近的分钟

java - 使用 Apache Spark SQL 和 Java 直接运行 sql 查询

java - Spark2数据帧获取两个表的差异,并排除一些

Scala函数错误: type mismatch

apache-spark - 具有太大而无法放入内存的查找表的批处理作业 (Spark)

hadoop - 在 HIVE 中,在 4 列上连接 2 个表时什么能提供最佳性能?键列类型 String、Int 或 binary?

scala - 在 Scala 中连接两个长度不等的列表

scala - 如何在 Apache Spark 中执行 UPSERT 或 MERGE 操作?

generics - 高阶泛型函数中的 ClassCastException