我有一个 json 文件,其结构为 [{"time","currentStop","lat","lon","speed"}],这是一个示例:
[
{"time":"2015-06-09 23:59:59","currentStop":"xx","lat":"22.264856","lon":"113.520450","speed":"25.30"},
{"time":"2015-06-09 21:00:49","currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},
{"time":"2015-06-09 21:55:49","currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}
]
我想获得具有结构 [{"hour","value":["currentStop","lat","lon","speed"]}] 的 json 结果。结果显示不同的每小时数据(“currentStop”,“lat”,“lon”,“speed”)。这是示例的结果(跳过一些空值):
[
{"hour":0,"value":[]},
{"hour":1,"value":[]},
......
{"hour":21,"value":[{"currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},{"currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}]}
{"hour":23, "value": [{"currentStop":"xx","lat":22.264856,"lon":113.520450,"speed":25.30}]},
]
是否可以使用spark-sql查询来实现这一点?
我使用spark配合Java API,配合循环,我可以得到我想要的东西,但这种方式确实效率低下,而且成本很高。
这是我的代码:
Dataset<Row> bus_ic=spark.read().json(file);
bus_ic.createOrReplaceTempView("view");
StringBuilder text = new StringBuilder("[");
bus_ic.select(bus_ic.col("currentStop"),
bus_ic.col("lon").cast("double"), bus_ic.col("speed").cast("double"),
bus_ic.col("lat").cast("double"),bus_ic.col("LINEID"),
bus_ic.col("time").cast("timestamp"))
.createOrReplaceTempView("view");
StringBuilder sqlString = new StringBuilder();
for(int i = 0; i<24; i++){
sqlString.delete(0,sqlString.length());
sqlString.append("select currentStop, speed, lat, lon from view where hour(time) = ")
.append(i)
.append(" group by currentStop, speed, lat, lon");
Dataset<Row> t = spark.sql(sqlString.toString());
text.append("{")
.append("\"h\":").append(i)
.append(",\"value\":")
.append(t.toJSON().collectAsList().toString())
.append("}");
if(i!=23) text.append(",");
}
text.append("]");
必须有其他方法来解决这个问题。如何编写高效的sql查询来实现这个目标?
最佳答案
您可以以更简洁的方式编写代码(Scala 代码):
val bus_comb = bus_ic
.groupBy(hour(to_timestamp(col("time"))).as("hour"))
.agg(collect_set(struct(
col("currentStop"), col("lat"), col("lon"), col("speed")
)).alias("value"));
bus_comb.toJSON.show(false);
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
// |value |
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
// |{"hour":23,"value":[{"currentStop":"xx","lat":"22.264856","lon":"113.520450","speed":"25.30"}]} |
// |{"hour":21,"value":[{"currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},{"currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}]}|
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
但是只有 24 条分组记录,因此没有机会进行扩展。这可能是一个有趣的练习,但它并不是真正可以应用于大型数据集的东西,而在大型数据集上使用 Spark 才有意义。
您可以通过加入范围
来添加缺失的小时数:
spark.range(0, 24).toDF("hour").join(bus_comb, Seq("hour"), "leftouter")
关于java - Spark : how to write efficient sql query to achieve this goal,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50427212/