我有一个包含 2 列的 Spark Dataframe,如下所示。
Date | Area
1/1/2016 | 1
3/1/2016 | 4
1/1/2016 | 1
5/1/2016 | 2
1/1/2016 | 3
1/1/2016 | 1
3/1/2016 | 4
1/1/2016 | 2
3/1/2016 | 3
3/1/2016 | 3
1/1/2016 | 4
1/1/2016 | 4
1/1/2016 | 2
我想要一个输出
Day: 1/1/2016 -> There are 3 rows at Area1
-> There are 2 rows at Area2
-> There are 1 rows at Area3
-> There are 2 rows at Area4
Day: 3/1/2016 -> There are 0 rows at Area1
-> There are 0 rows at Area2
-> There are 2 rows at Area3
-> There are 2 rows at Area4
Day: 5/1/2016 -> ..........
我的 java 8 代码是:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.sql.*;
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("My 1st Spark app");
conf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
Dataset<Row> df = sparkSession.read().option("header", true).option("inferSchema", "true").option("timestampFormat", "yyyy-MM-dd hh:mm:ss").csv("hdfs://quickstart.cloudera:8020//user//cloudera//fares.csv");
Dataset<Row> df = df_date_column.groupBy("Date").count();
但是我有一个按日期而不是按区域分组的结果。那么如何按日期和区域进行分组呢?
最佳答案
这可以使用 Spark SQL 窗口函数以及带有收集函数的 Spark Dataframe 上的每个循环来完成(对于大数据来说不理想,因为作业会变慢)。下面是 pyspark 代码,您可以将其转换为 Java,因为主 Spark sql 查询不会改变。稍后使用 java for 循环并访问数组上的每个元素,即sparkDataFrame.collect()。
from pyspark.sql.functions import *
data.createOrReplaceTempView("tmp")
# final = data.groupBy("Area").agg(count("Date"))
# final.show(20,False)
df = spark.sql("""
SELECT distinct date,
area,
count(area) over (partition by date,area order by date,area) as area_cnt,
min(area) over (partition by date order by date,area) as area_first,
max(area) over (partition by date order by date,area desc) as area_last
from tmp
order by date, area
""")
df.show(20,False)
for i in df.collect() :
if i.area_first == i.area :
print("Day: " + i.date + " -> There are " + str(i.area_cnt) + " rows at Area" + str(i.area))
else :
print(" -> There are " + str(i.area_cnt) + " rows at Area" + str(i.area))
InputData :
+--------+----+--------+----------+---------+
|date |area|area_cnt|area_first|area_last|
+--------+----+--------+----------+---------+
|1/1/2016|1 |3 |1 |4 |
|1/1/2016|2 |2 |1 |4 |
|1/1/2016|3 |1 |1 |4 |
|1/1/2016|4 |2 |1 |4 |
|3/1/2016|3 |2 |3 |4 |
|3/1/2016|4 |2 |3 |4 |
|5/1/2016|2 |1 |2 |2 |
+--------+----+--------+----------+---------+
Output :
Day: 1/1/2016 -> There are 3 rows at Area1
-> There are 2 rows at Area2
-> There are 1 rows at Area3
-> There are 2 rows at Area4
Day: 3/1/2016 -> There are 2 rows at Area3
-> There are 2 rows at Area4
Day: 5/1/2016 -> There are 1 rows at Area2
关于java - 组 - 从 Dataframe 中的 2 列进行计数 - Spark Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59377428/