java - Spark Sql 映射问题

标签 java apache-spark apache-spark-sql spark-dataframe

Sparks2/Java8 Cassandra 2 尝试从 Cassandra 读取一些数据,然后在 spark 中通过查询运行一个组。我的 DF 中只有 2 列 转换(日期),原点(字符串)

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins  GROUP BY (origin,transdate) ORDER BY cnt DESC LIMIT 1"); `

获取错误:

 `Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'origins.`origin`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value)`

group by 问题得到解决,如下所示删除 group by 中的 ( )

完整代码:(尝试获取某个来源/位置日期的最大交易数量)

JavaRDD<TransByDate> originDateRDD = javaFunctions(sc).cassandraTable("trans", "trans_by_date", CassandraJavaUtil.mapRowTo(TransByDate.class))
                    .select(CassandraJavaUtil.column("origin"), CassandraJavaUtil.column("trans_date").as("transdate")).limit((long)100) ;
Dataset<Row> originDF = sparks.createDataFrame(originDateRDD, TransByDate.class);
String[] columns = originDF.columns();
System.out.println("originDF columns: "+columns[0]+" "+columns[1]) ; -> transdate origin
originDF.createOrReplaceTempView("origins");

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins  GROUP BY origin,transdate ORDER BY cnt DESC LIMIT 1"); 
List list = maxOrigindate.collectAsList(); -> Exception here
int j = list.size();

originDF :转换原点

`public static class TransByDate implements Serializable {
        private String origin;
        private Date transdate;

        public TransByDate() { }

        public TransByDate (String origin, Date transdate) { 
            this.origin = origin;
            this.transdate= transdate;

        }

        public String getOrigin() { return origin; }
        public void setOrigin(String origin) { this.origin = origin; }

        public Date getTransdate() { return transdate; }
        public void setTransdate(Date transdate) { this.transdate = transdate; }

    }

架构

root
 |-- transdate: struct (nullable = true)
 |    |-- date: integer (nullable = false)
 |    |-- day: integer (nullable = false)
 |    |-- hours: integer (nullable = false)
 |    |-- minutes: integer (nullable = false)
 |    |-- month: integer (nullable = false)
 |    |-- seconds: integer (nullable = false)
 |    |-- time: long (nullable = false)
 |    |-- timezoneOffset: integer (nullable = false)
 |    |-- year: integer (nullable = false)
 |-- origin: string (nullable = true)

异常 错误执行器:阶段 2.0 (TID 12) 中的任务 0.0 出现异常 scala.MatchError:Sun Jan 01 00:00:00 PST 2012(类 java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) .... 线程“main”中的异常 org.apache.spark.SparkException:作业因阶段失败而中止:阶段 2.0 中的任务 0 失败 1 次,最近的失败:阶段 2.0 中的任务 0.0 丢失(TID 12,本地主机):scala.MatchError :Sun Jan 01 00:00:00 PST 2012(类 java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) ... 驱动程序堆栈跟踪: 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) ... 在 org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2184) 在 org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559) 在 org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2184) 在 spark.SparkTest.sqlMaxCount(SparkTest.java:244) -> List list = maxOrigindate.collectAsList();

引起:scala.MatchError:Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) 在 org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)

最佳答案

您遇到错误。

Caused by: scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at 

这个错误是因为Spark sql支持java.sql.Date类型。请查看 Spark 文档 here .也可以引用SPARK-2562 .

关于java - Spark Sql 映射问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41496851/

相关文章:

java - Twitter4J sendDirectMessage 返回页面不存在

python - 在 PySpark 上描述数据帧

hadoop - Spark - 寻找重叠值或寻找共同 friend 的变体

python - 如何使用多个值更改 Spark 中 DataFrame 的 na 值

java - Apache Spark GraphX java.lang.ArrayIndexOutOfBoundsException

sql - 如何比较两个结构相同的数据框以计算行差异

java - 生成唯一的引用号

java - 在 JUnit 测试中将 Maven 属性传递给 Spring 应用程序上下文文件

java - 非加权图中邻接表中的最短路径

linux - 部署Hadoop集群2.6后Datanode无法启动