java - 在 Apache Spark 中加入、聚合然后选择特定列

标签 java apache-spark apache-spark-sql

产品和相应的销售是从 csv 文件正确加载的,如下所示

    Dataset<Row> dfProducts = sparkSession.read()
            .option("mode", "DROPMALFORMED")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("charset", "UTF-8")
            .csv(new ClassPathResource("products.csv").getURL().getPath());
    Dataset<Row> dfSaledetails = sparkSession.read()
            .option("mode", "DROPMALFORMED")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("charset", "UTF-8")
            .csv(new ClassPathResource("saledetails.csv").getURL().getPath());

产品具有列(product_id、product_name,...)。销售额有列(product_id、金额...)

我需要实现的是基于公共(public)列(product_id)、按product_id分组、总和列amount连接两个数据集然后仅选择/显示特定列(product_name 和求和结果)

以下是我的尝试

    Dataset<Row> dfSalesTotals = dfSaledetails
            .join(dfProducts, dfSaledetails.col("product_id").equalTo(dfProducts.col("product_id")))
            .groupBy(dfSaledetails.col("product_id"))
            .agg(sum(dfSaledetails.col("amount")).alias("total_amount"))
            .select(dfProducts.col("product_name"), col("total_amount"));
    dfSalesTotals.show();

这会引发以下错误

Caused by: org.apache.spark.sql.AnalysisException: Resolved attribute(s) product_name#215 missing from product_id#272,total_amount#499 in operator 
!Project [product_name#215, total_amount#499].;;
!Project [product_name#215, total_amount#499]
+- Aggregate [product_id#272], [product_id#272, sum(amount#277) AS total_amount#499]
   +- Join Inner, (product_id#272 = product_id#212)
      :- Relation[sale_detail_auto_id#266,sale_auto_id#267,sale_id#268,agent_id#269,sale_detail_id#270,inventory_id#271,product_id#272,unit_cost#273,unit_price#274,vat#275,quantity#276,amount#277,promotion_id#278,discount#279] csv
  +- Relation[product_id#212,user_group_id_super_owner#213,product_category#214,product_name#215,product_type#216,product_code#217,distributor_code#218,product_units#219,product_unitCost#220,product_manufacturer#221,product_distributor#222,create_date#223,update_date#224,vat#225,product_weight#226,carton_size#227,product_listStatus#228,active_status#229,distributor_type#230,bundle_type#231,barcode_type#232,product_family_id#233] csv

最佳答案

如果您想保留product_name,它应该位于groupBy

.groupBy(
  dfSaledetails.col("product_id"),
  col("product_name")))

或者在agg

.agg(
  sum(dfSaledetails.col("amount")).alias("total_amount"), 
  first(col("product_name")).alias("product_name"))

关于java - 在 Apache Spark 中加入、聚合然后选择特定列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53852194/

相关文章:

java - 为什么我们不能在私有(private)锁的情况下获取锁?

scala - 如果列表中存在,则从列中删除单词

python - 如何有效地将新 key 添加到 pyspark 中的 RDD

apache-spark - Spark查询运行很慢

scala - Spark : shuffle operation leading to long GC pause

java - GAE : How is it possible to access data/files that are stored in the GAE Blobstore by an external Web Service that is not deployed on GAE using Java

java - 如何确定 Android 应用程序崩溃的原因

java - 如何将代码与Standard Widget Toolkit中的UI线程同步

elasticsearch - 设置parent后,elasticsearch-spark无法索引类型

azure - 获得azure blob存储的最佳方式是什么