java - 在spark sql中使用count、having和orderby执行原生sql

标签 java dataframe apache-spark apache-spark-sql

我是 apache-spark 的新手。

我通过聚合函数 count、having 和 orderby 的组合得到了这个查询。 这是我从 here 引用的有效 SQL 查询

我从 parquet 文件创建了一个 dataframe,然后尝试执行此查询:

SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .config("spark.master", "local")
                .getOrCreate();
final Dataset<Row> dataset = spark.read().parquet("src/main/resources/test2.parquet");
dataset.createOrReplaceTempView("customers");
final Dataset<Row> dataset1 = spark.sql("SELECT count(customerid), customerid, country FROM customers GROUP BY country, customerid HAVING count(customerid) > 5 ORDER BY count(customerid) DESC");
dataset1.show();

但我收到错误

java.lang.UnsupportedOperationException: Cannot evaluate expression: count(input[1, string, true])

at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:105)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$3.apply(GenerateOrdering.scala:83)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$3.apply(GenerateOrdering.scala:82)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.genComparisons(GenerateOrdering.scala:82)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.create(GenerateOrdering.scala:164)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.create(GenerateOrdering.scala:43)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1193)
at org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.<init>(GenerateOrdering.scala:207)
at org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.<init>(GenerateOrdering.scala:204)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:135)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
at org.apache.spark.sql.Dataset.show(Dataset.scala:746)
at org.apache.spark.sql.Dataset.show(Dataset.scala:705)
at org.apache.spark.sql.Dataset.show(Dataset.scala:714)

queryExecution的输出

== Parsed Logical Plan ==
'Sort ['count('customerid) DESC NULLS LAST], true
+- 'Filter ('count('customerid) > 5)
   +- 'Aggregate ['country, 'customerid], [unresolvedalias('count('customerid), None), 'customerid, 'country]
      +- 'UnresolvedRelation `customers`

== Analyzed Logical Plan ==
count(customerid): bigint, customerid: string, country: string
Sort [count(customerid#0) DESC NULLS LAST], true
+- Project [count(customerid)#42L, customerid#0, country#6]
   +- Filter (count(customerid#0)#45L > cast(5 as bigint))
      +- Aggregate [country#6, customerid#0], [count(customerid#0) AS count(customerid)#42L, customerid#0, country#6, count(customerid#0) AS count(customerid#0)#45L]
         +- SubqueryAlias `customers`
            +- Relation[customerid#0,customername#1,contactname#2,address#3,city#4,postalcode#5,country#6] parquet

== Optimized Logical Plan ==
Sort [count(customerid#0) DESC NULLS LAST], true
+- Project [count(customerid)#42L, customerid#0, country#6]
   +- Filter (count(customerid#0)#45L > 5)
      +- Aggregate [country#6, customerid#0], [count(customerid#0) AS count(customerid)#42L, customerid#0, country#6, count(customerid#0) AS count(customerid#0)#45L]
         +- Project [customerid#0, country#6]
            +- Relation[customerid#0,customername#1,contactname#2,address#3,city#4,postalcode#5,country#6] parquet

== Physical Plan ==
*(3) Sort [count(customerid#0) DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count(customerid#0) DESC NULLS LAST, 200)
   +- *(2) Project [count(customerid)#42L, customerid#0, country#6]
      +- *(2) Filter (count(customerid#0)#45L > 5)
         +- *(2) HashAggregate(keys=[country#6, customerid#0], functions=[count(customerid#0)], output=[count(customerid)#42L, customerid#0, country#6, count(customerid#0)#45L])
            +- Exchange hashpartitioning(country#6, customerid#0, 200)
               +- *(1) HashAggregate(keys=[country#6, customerid#0], functions=[partial_count(customerid#0)], output=[country#6, customerid#0, count#53L])
                  +- *(1) FileScan parquet [customerid#0,country#6] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/D:.../src/main/resources/test2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customerid:string,country:string>

以及数据和模式的输出:

+----------+--------------------+------------------+--------------------+-----------+----------+-------+
|customerid|        customername|       contactname|             address|       city|postalcode|country|
+----------+--------------------+------------------+--------------------+-----------+----------+-------+
|         1| Alfreds Futterkiste|      Maria Anders|       Obere Str. 57|     Berlin|     12209|Germany|
|         2|Ana Trujillo Empa...|      Ana Trujillo|Avda. de la Const...|M�xico D.F.|      5021| Mexico|
|         3|Antonio Moreno Ta...|    Antonio Moreno|      Mataderos 2312|M�xico D.F.|      5023| Mexico|
|         4|     Around the Horn|      Thomas Hardy|     120 Hanover Sq.|     London|   WA1 1DP|     UK|
|         5|  Berglunds snabbk�p|Christina Berglund|      Berguvsv�gen 8|      Lule�|  S-958 22| Sweden|
+----------+--------------------+------------------+--------------------+-----------+----------+-------+

root
 |-- customerid: string (nullable = true)
 |-- customername: string (nullable = true)
 |-- contactname: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- postalcode: string (nullable = true)
 |-- country: string (nullable = true)

我不明白这里出了什么问题。

最佳答案

试试这个-

 dataset.createOrReplaceTempView("customers");
        final Dataset<Row> dataset1 = spark.sql("SELECT count(customerid) as count, customerid, country FROM " +
                "customers" +
                " GROUP BY country, customerid HAVING count > 5 ORDER BY count DESC");
        dataset1.show();

关于java - 在spark sql中使用count、having和orderby执行原生sql,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62221075/

相关文章:

python - 如何在 pyspark 中保存数据帧转换过程的部分结果?

apache-spark - 迭代配对 RDD (Pyspark) 的值并替换空值

java - 如何编写同一个按钮来完成两项任务?

python - 如何阻止 Pandas 自动转换我的日期格式?

java - RestTemplate 有时会给出 500 内部服务器错误

python - 更改每个特定列的列值

python - 如何获取数据框列值并替换?

java - Spark 性能中的 map 操作链

java - 从系统应用程序重新安装/系统;

java - 如何从另一个类中定义的类创建数组