java - 如何使用spark java api在cassandra表中进行平均值、最大总和等聚合

标签 java apache-spark cassandra datastax datastax-java-driver

我在 cassandra 数据库中有大量数据,我想使用 Spark java api 对某些列名称名称进行平均值、最大值和总和等聚合

我尝试过如下

cassandraRowsRDD
  .select("name", "age", "ann_salaray", "dept","bucket", "resourceid", "salaray")
  .where("timestamp = ?", "2018-01-09 00:00:00")
  .withAscOrder()

我看到了这个方法 - .aggregate(zeroValue, seqOp, CombOp),但不知道如何使用它

预期:

max(salary column name)
avg(salary column name)

我尝试过使用CQL,但由于数据量太大而失败

任何人都可以给我一个使用 Spark java api 在 cassandra 表中聚合的示例

最佳答案

第一个参数提供所谓的“零值”,用于初始化“累加器”,第二个参数 - 从 RDD 中获取累加器和单个值的函数,第三个参数 - 接受 2 个累加器并将它们组合起来的函数。

对于您的任务,您可以使用类似的东西(伪代码)

res = rdd.aggregate((0,0,0),
   (acc, value) => (acc._1 + 1,
                    acc._2 + value.salary,
                    if (acc._3 > value.salary) then acc._3 else value.salary),
   (acc1, acc2) => (acc1._1 + acc2._1,
                    acc1._2 + acc2._2,
                    if (acc1._3 > acc2._3) then acc1._3 else acc2._3))
 val avg = res._2/res._1
 val max = res._3

在这种情况下,我们有:

  1. (0,0,0) - 3 个元素的元组,分别表示:RDD 中的元素数量、所有工资的总和以及最高工资
  2. 从累加器和值生成新元组的函数
  3. 组合 2 个元组的函数

然后有了条目数、工资总额和最大值,我们就可以找到所有必要的数据。

关于java - 如何使用spark java api在cassandra表中进行平均值、最大总和等聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48886382/

相关文章:

java检测何时创建或关闭任何窗口

hadoop - Spark parquet 数据帧分区数

apache-spark - apache spark 2.2 没有可用的 toString 方法

elasticsearch - 如何在conf/elasticsearch.yml中定义datacenter.group以便运行Elassandra多数据中心?

scala - Spark 1.5.1、Cassandra 连接器 1.5.0-M2、Cassandra 2.1、Scala 2.10、NoSuchMethodError Guava 依赖项

java - Mockito:验证any()参数是否相同

java - 使用 Spring RequestContextHolder 时出现 IllegalStateException

java - XMLStreamWriter2.closeCompletely()V NoSuchMethodError

java - 如何展平数组中的嵌套结构?

cassandra - 在 Mac OS X 上停止 cassandra 服务器