java - 如何累积运行 Spark sql 聚合器?

标签 java apache-spark apache-spark-sql

我目前正在开发一个包含 Spark 数据集(Java 语言)的项目,在该项目中,我必须创建一个新列,该新列源自对所有先前行运行的累加器。

我一直在使用自定义 UserDefinedAggregationFunction 来实现此功能从 unboundedPreceding 的 window 上方至currentRow .

事情是这样的:

df.withColumn("newColumn", customAccumulator
    .apply(columnInputSeq)
    .over(customWindowSpec));

但是,出于类型安全原因和通常更干净的代码,我真的更喜欢使用类型化数据集。即:使用 org.apache.spark.sql.expressions.Aggregator 执行相同的操作超过Dataset<CustomType> 。这里的问题是我已经浏览了所有文档,但无法弄清楚如何使其以与上面相同的方式运行(即我只能获得整个列的最终聚合,而不是每行的累积状态) .

我想要做的事情可能吗?如果可能的话,如何做?

为清楚起见添加示例:

初始表:

+-------+------+------+
| Index | Col1 | Col2 |
+-------+------+------+
|     1 | abc  | def  |
|     2 | ghi  | jkl  |
|     3 | mno  | pqr  |
|     4 | stu  | vwx  |
+-------+------+------+

然后使用示例聚合操作: 首先反转累加器,在 Col1 前面附加 Col2 并返回该值,并将其设置为累加器。

+-------+------+------+--------------------------+
| Index | Col1 | Col2 |       Accumulator        |
+-------+------+------+--------------------------+
|     1 | abc  | def  | abcdef                   |
|     2 | ghi  | jkl  | ghifedcbajkl             |
|     3 | mno  | pqr  | mnolkjabcdefihgpqr       |
|     4 | stu  | vwx  | sturpqghifedcbajklonmvwx |
+-------+------+------+--------------------------+

使用 UserDefinedAggregateFunction我已经能够制作这个,但带有 Aggregator我只能得到最后一行。

最佳答案

你不

我的消息来源是一位 friend ,他一直在研究与此相同的问题,现在得出的结论是这是不可能的

关于java - 如何累积运行 Spark sql 聚合器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57145552/

相关文章:

java - 二维数组 Java 的越界错误

python - 协同过滤中的多重特征——spark

apache-spark - 处理看不见的分类字符串 Spark CountVectorizer

scala - 如何读取多个 Parquet 表?

apache-spark - 订购 Pyspark 窗口时缺少数据

java - 将 Room 持久性库与 sqlite 一起使用

java - Jackson:生成带有引用的模式

apache-spark - 从 Databricks 中的 UDF 内部查询 Delta Lake

python - 断言错误 : col should be Column

java - Eclipse Tomcat 发布不工作