scala - 使用 Spark 并行缓存和查询数据集

标签 scala hadoop apache-spark

我有一个要求,我想缓存一个数据集,然后通过在该数据集上并行触发“N”个查询来计算一些指标,所有这些查询都计算类似的指标,只是过滤器会改变,我想运行并行执行这些查询,因为响应时间至关重要,而且我想要缓存的数据集的大小总是小于 1 GB。

我知道如何在 Spark 中缓存一个数据集,然后随后对其进行查询,但是如果我必须对同一个数据集并行运行查询,我该如何实现呢?引入 alluxio 是一种方式,但我们可以通过任何其他方式在 Spark 世界中实现同样的目标吗?

例如,对于 Java,我可以将数据缓存在内存中,然后通过使用多线程我可以实现相同的目的,但是如何在 Spark 中实现呢?

最佳答案

使用 Scala 的并行集合可以非常简单地在 Spark 的驱动程序代码中触发并行查询。这是一个最小的例子:

val dfSrc = Seq(("Raphael",34)).toDF("name","age").cache()


// define your queries, instead of returning a dataframe you could also write to a table etc
val query1: (DataFrame) => DataFrame = (df:DataFrame) => df.select("name")
val query2: (DataFrame) => DataFrame = (df:DataFrame) => df.select("age")

// Fire queries in parallel
import scala.collection.parallel.ParSeq
ParSeq(query1,query2).foreach(query => query(dfSrc).show())

编辑:

要在 map 中收集查询 ID 和结果,您应该这样做:

val resultMap  = ParSeq(
 (1,query1), 
 (2,query2)
).map{case (queryId,query) => (queryId,query(dfSrc))}.toMap

关于scala - 使用 Spark 并行缓存和查询数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47726441/

相关文章:

scala - Hive Warehouse连接器-读取包含带有类型数组列的Hive表

java - spark 0.9.1 on hadoop 2.2.0 maven 依赖

json - 德鲁伊 Parquet 摄取性能差

python - 我正在开发类似于 fb 聊天的应用程序。哪个框架 - Play 还是 Django?

scala - Scala 中存在类型的下划线

scala - Karaf Unresolved JDBC 约束

hadoop - Oozie 未能为失败的操作获取标准输出

mysql - sqoop import 为正确的 sql 查询提供了错误的结果

scala - 在spark scala 中将行合并为单个struct 列存在效率问题,我们如何做得更好?

Scala Spark - 任务不可序列化