java - 使用 Apache Spark 从数据框中获取不同的计数

标签 java apache-spark-sql aggregate-functions apache-spark-2.0

我有这样的数据

+--------------+---------+-------+---------+
|       dataOne|OtherData|dataTwo|dataThree|
+--------------+---------|-------+---------+
|          Best|     tree|      5|      533|
|            OK|     bush|      e|     3535|
|           MEH|      cow|      -|     3353|
|           MEH|      oak|   none|       12|
+--------------+---------+-------+---------+

我正试图将它放入

的输出中
+--------------+---------+
|       dataOne|    Count|
+--------------+---------|
|          Best|        1|
|            OK|        1|
|           Meh|        2|
+--------------+---------+

我可以毫无问题地将 dataOne 单独放入数据框中并显示其内容,以确保我只是抓取 dataOne 列, 但是,我似乎无法找到将 sql 查询转换为我需要的数据的正确语法。我尝试从整个数据集创建的临时 View 创建以下数据框

Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from 
dataFrame group by dataOne");
dataOneCount.show();

但是 Spark 我在这方面找到的文档仅展示了如何在 spark 1.6 及之前的版本中进行此类聚合,因此我们将不胜感激。

这是我收到的错误消息,但是我已经检查了数据并且其中没有索引错误。

 java.lang.ArrayIndexOutOfBoundsException: 11

我也试过应用 functions() 方法 countDistinct

Column countNum = countDistinct(dataFrame.col("dataOne"));
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum);
result.show();

其中 dataOneDataFrame 是运行时创建的 dataFrame

select dataOne from dataFrame

但它返回一个分析异常,我对 spark 还是个新手,所以我不确定我评估 countDistinct 方法的方式/时间是否有错误

编辑:为了澄清,显示的第一个表是我通过读取文本文件并对其应用自定义模式而创建的数据帧的结果(它们仍然都是字符串)

Dataset<Row> dataFrame 

这是我的完整代码

public static void main(String[] args) {


    SparkSession spark = SparkSession
            .builder()
            .appName("Log File Reader")
            .getOrCreate();

    //args[0] is the textfile location
    JavaRDD<String> logsRDD = spark.sparkContext()
            .textFile(args[0],1)
            .toJavaRDD();

    String schemaString = "dataOne OtherData dataTwo dataThree";

    List<StructField> fields = new ArrayList<>();
    String[] fieldName = schemaString.split(" ");


    for (String field : fieldName){
        fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
    }
    StructType schema = DataTypes.createStructType(fields);

    JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
       String[] attributes = record.split(" ");
       return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]);
    });


    Dataset<Row> dF = spark.createDataFrame(rowRDD, schema);

    //first attempt
    dF.groupBy(col("dataOne")).count().show();

    //Trying with a sql statement
    dF.createOrReplaceTempView("view");
    dF.sparkSession().sql("select command, count(*) from view group by command").show();

最有可能想到的是使用 RowFactory 返回行的 lambda 函数?这个想法似乎不错,但我不确定它是否真的成立,或者是否有其他方法可以做到。除此之外,我很困惑

样本数据

best tree 5 533
OK bush e 3535
MEH cow - 3353
MEH oak none 12

最佳答案

为方便起见,使用 Scala 语法。它与 Java 语法非常相似:

// Input data
val df = {
  import org.apache.spark.sql._
  import org.apache.spark.sql.types._
  import scala.collection.JavaConverters._

  val simpleSchema = StructType(
    StructField("dataOne", StringType) ::
    StructField("OtherData", StringType) ::
    StructField("dataTwo", StringType) ::
    StructField("dataThree", IntegerType) :: Nil)

  val data = List(
    Row("Best", "tree", "5", 533),
    Row("OK", "bush", "e", 3535),
    Row("MEH", "cow", "-", 3353),
    Row("MEH", "oak", "none", 12)
  )

  spark.createDataFrame(data.asJava, simpleSchema)
}

df.show
+-------+---------+-------+---------+
|dataOne|OtherData|dataTwo|dataThree|
+-------+---------+-------+---------+
|   Best|     tree|      5|      533|
|     OK|     bush|      e|     3535|
|    MEH|      cow|      -|     3353|
|    MEH|      oak|   none|       12|
+-------+---------+-------+---------+
df.groupBy(col("dataOne")).count().show()
+-------+-----+
|dataOne|count|
+-------+-----+
|    MEH|    2|
|   Best|    1|
|     OK|    1|
+-------+-----+

我可以使用 S3 上的四行数据文件提交上面给出的 Java 代码,它工作正常:

$SPARK_HOME/bin/spark-submit \
  --class sparktest.FromStackOverflow \
  --packages "org.apache.hadoop:hadoop-aws:2.7.3" \
  target/scala-2.11/sparktest_2.11-1.0.0-SNAPSHOT.jar "s3a://my-bucket-name/sample.txt"

关于java - 使用 Apache Spark 从数据框中获取不同的计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45866861/

相关文章:

java - 在 ArrayList 中使用接口(interface)

java - 我的 pom 中有一个依赖项,该依赖项与其他 pom 冲突

apache-spark - WARN ReliableDeliverySupervisor : Association with remote system has failed, 地址现在被门控 [5000] 毫秒。原因:[已解除关联]

pyspark.sql.functions.col 和 pyspark.sql.functions.lit 之间的 PySpark 区别

sql - 在时间间隔内选择第一行和最后一行

java - 禁用上下文 LOB 创建作为 createClob() 方法引发错误

python - 如何使用 Spark 函数 PySpark 将字符串转换为列表

sql - 子查询中的总和

sql - postgresql 中的第一个和最后一个值聚合函数可以正确处理 NULL 值

java - 简单的二叉树问题