scala - 使用 Scala 读取 Cassandra 中的并行性

标签 scala apache-spark concurrency cassandra

我正在尝试使用 spark 从 Cassandra 表调用并行读取。但是我无法调用并行性,因为在任何给定时间只发生一次读取。应该遵循什么方法来实现相同的目标?

最佳答案

我建议您采用以下方法 source Russell Spitzer's Blog

使用部分扫描联合手动划分我们的分区: 将任务推送给最终用户也是一种可能性(也是当前的解决方法)。大多数最终用户已经理解为什么他们有长分区并且通常知道他们的列值所在的域。这使得他们可以手动划分提出一个请求,以便它切碎大分区。

例如,假设用户知道集群 c 列的范围从 1 到 1000000。他们可以编写如下代码

val minRange = 0
val maxRange = 1000000
val numSplits = 10
val subSize = (maxRange - minRange) / numSplits

sc.union(
  (minRange to maxRange by subSize)
    .map(start => 
      sc.cassandraTable("ks", "tab")
        .where("c > $start and c < ${start + subSize}"))
)

每个 RDD 都包含一组唯一的任务,仅绘制完整分区的一部分。 union 操作将所有这些不同的任务连接到一个 RDD 中。任何单个 Spark 分区将从单个 Cassandra 分区提取的最大行数将限制为 maxRange/numSplits。这种方法虽然需要用户干预,但会保留局部性并且仍会最大限度地减少磁盘扇区之间的跳跃。

还有 read-tuning-parameters

Read tuning parameters

关于scala - 使用 Scala 读取 Cassandra 中的并行性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56638507/

相关文章:

java - 在 Eclipse IDE 中使用 Scala 代码。内部编译器错误

scala - Spark Streaming groupByKey 和 updateStateByKey 实现

linux - 如果进程数大于内核数的一半,为什么性能会下降?

scala - 寻找游戏中 Json 的隐含值

scala - 在reactivemongo中为集合创建唯一索引

scala - 当 reflect.runtime.universe._ 存在时,为什么 reflect.runtime.universe.RuntimeClass 会推断为 Nothing?

java - 后台线程调用的警告消息 Transaction afterCompletion

windows - 为什么 spark-shell 失败并显示 "The filename, directory name, or volume label syntax is incorrect."?

java - G1 GC 单个,很长的年轻 GC 发生在 ParallelGCThreads=1

java Fork/Join 关于堆栈使用的说明