java - Spark : how to write efficient sql query to achieve this goal

标签 java apache-spark apache-spark-sql

我有一个 json 文件,其结构为 [{"time","currentStop","lat","lon","speed"}],这是一个示例:

[
  {"time":"2015-06-09 23:59:59","currentStop":"xx","lat":"22.264856","lon":"113.520450","speed":"25.30"},
  {"time":"2015-06-09 21:00:49","currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},
  {"time":"2015-06-09 21:55:49","currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}
]

我想获得具有结构 [{"hour","value":["currentStop","lat","lon","speed"]}] 的 json 结果。结果显示不同的每小时数据(“currentStop”,“lat”,“lon”,“speed”)。这是示例的结果(跳过一些空值):

[
  {"hour":0,"value":[]},
  {"hour":1,"value":[]},
  ......
  {"hour":21,"value":[{"currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},{"currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}]}
  {"hour":23, "value": [{"currentStop":"xx","lat":22.264856,"lon":113.520450,"speed":25.30}]},
]  

是否可以使用spark-sql查询来实现这一点?

我使用spark配合Java API,配合循环,我可以得到我想要的东西,但这种方式确实效率低下,而且成本很高。

这是我的代码:

Dataset<Row> bus_ic=spark.read().json(file);
bus_ic.createOrReplaceTempView("view");
StringBuilder text = new StringBuilder("[");
bus_ic.select(bus_ic.col("currentStop"),
            bus_ic.col("lon").cast("double"), bus_ic.col("speed").cast("double"),
            bus_ic.col("lat").cast("double"),bus_ic.col("LINEID"),
            bus_ic.col("time").cast("timestamp"))
            .createOrReplaceTempView("view");
StringBuilder sqlString = new StringBuilder();

for(int i = 0; i<24; i++){
   sqlString.delete(0,sqlString.length());

   sqlString.append("select currentStop, speed, lat, lon from view  where hour(time) = ")
           .append(i)
           .append(" group by currentStop, speed, lat, lon");
   Dataset<Row> t = spark.sql(sqlString.toString());
   text.append("{")
           .append("\"h\":").append(i)
           .append(",\"value\":")
           .append(t.toJSON().collectAsList().toString())
           .append("}");
   if(i!=23) text.append(",");
}
text.append("]");

必须有其他方法来解决这个问题。如何编写高效的sql查询来实现这个目标?

最佳答案

您可以以更简洁的方式编写代码(Scala 代码):

val bus_comb = bus_ic
  .groupBy(hour(to_timestamp(col("time"))).as("hour"))
  .agg(collect_set(struct(
    col("currentStop"), col("lat"), col("lon"), col("speed")
)).alias("value"));
bus_comb.toJSON.show(false);

// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                                                   |
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
// |{"hour":23,"value":[{"currentStop":"xx","lat":"22.264856","lon":"113.520450","speed":"25.30"}]}                                                         |
// |{"hour":21,"value":[{"currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},{"currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}]}|
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+

但是只有 24 条分组记录,因此没有机会进行扩展。这可能是一个有趣的练习,但它并不是真正可以应用于大型数据集的东西,而在大型数据集上使用 Spark 才有意义。

您可以通过加入范围来添加缺失的小时数:

spark.range(0, 24).toDF("hour").join(bus_comb, Seq("hour"), "leftouter")

关于java - Spark : how to write efficient sql query to achieve this goal,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50427212/

相关文章:

java - 如何将 JSON 对象列表转换为 JsonArray 然后对其进行迭代?

java - 如何配置嵌入式jetty 9动态重新加载静态内容?

java - Apache Spark - 本地 CUDA GPU 的优化和利用

java - 通过在两列之间添加空列来使用现有数据集创建新数据集

Java 邮件和数据库连接发生冲突

apache-spark - Structured Streaming 是实时流处理引擎吗?

java - Spark Driver 内存和 Executor 内存

scala - 根据时间戳列过滤数据框

python - 分解多个 pandas 列并取消嵌套一列作为列名

javaFx 单选按钮和文本字段用户选择和用户输入