function - 如何在 Spark Java 中使用分析/窗口函数?

标签 function apache-spark analytical

我正在尝试在 Spark Java 中使用分析/窗口函数 last_value。

网易查询:

select sno, name, addr1, addr2, run_dt, 
last_value(addr1 ignore nulls) over (partition by sno, name, addr1, addr2, run_dt order by beg_ts , end_ts rows between unbounded preceding and unbounded following  ) as last_addr1
from daily

我们要在 Spark Java 中实现这个查询(不使用 HiveSQLContext):
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.WindowFunctionFrame;

    SparkConf conf = new SparkConf().setMaster("local").setAppName("Agg");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);


    JavaRDD<Stgdailydtl> daily = sc.textFile("C:\\Testing.txt").map(
              new Function<String, Stgdailydtl>() {
                  private static final long serialVersionUID = 1L;
                public Stgdailydtl call(String line) throws Exception {
                  String[] parts = line.split(",");

                  Stgdailydtl daily = new Stgdailydtl();
                  daily.setSno(Integer.parseInt(parts[0].trim()));
                  .....

                  return daily;
                }
              });
DataFrame schemaDailydtl = sqlContext.createDataFrame(daily, Stgdailydtl.class);
schemaDailydtl.registerTempTable("daily");
WindowSpec ws = Window.partitionBy("sno, name, addr1, addr2, run_dt").orderBy("beg_ts , end_ts").rowsBetween(0, 100000);
DataFrame df = sqlContext.sql("select sno, name, addr1, addr2, run_dt "
            + "row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from daily ");

}

}

错误:
Exception in thread "main" java.lang.RuntimeException: [1.110] failure: ``union'' expected but `(' found

select stg.mach_id, stg.msrmt_gbl_id, stg.msrmt_dsc, stg.elmt_dsc, stg.elmt_dsc_grp_concat, row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from stgdailydtl stg 
                                                                                                             ^
    at scala.sys.package$.error(package.scala:27)

我无法理解如何使用 WindowSpec/Window 对象。请就此提出建议。
谢谢你的帮助

最佳答案

您正在混合数据帧语法和 sql 语法 - 特别是您创建了一个 WindowSpec 但没有使用它。

进口 org.apache.spark.sql.functions获取 row_number函数,然后创建您要选择的列:

Column rowNum = functions.row_number().over(ws)

然后使用数据框 API 选择它:
df.select(each, column, you, want, rowNum)

我的语法可能有点不对,我习惯了 Scala 或 python,但要点是这样的。

关于function - 如何在 Spark Java 中使用分析/窗口函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33319279/

相关文章:

scala - 基于两列的spark join操作

python - 在 sympy 或替代方案中导出点云方程

Oracle 解析函数

python - 将父类(super class)实例转换为子类实例

scala - 在 SPARK 中使用 elasticsearch-spark 从 Elasticsearch 读取数据时如何转换类型

c++ - 在 C++ 中将列表框作为函数参数传递

json - Spark 运行错误 java.lang.NoClassDefFoundError : org/codehaus/jackson/annotate/JsonClass

SQL 窗口函数 : does the order of appearance for columns matter in partition by?

c++ - 'this' 参数的类型为 const 但函数未标记为 const

c - sizeof(函数名)