我目前正在开发一个包含 Spark 数据集(Java 语言)的项目,在该项目中,我必须创建一个新列,该新列源自对所有先前行运行的累加器。
我一直在使用自定义 UserDefinedAggregationFunction
来实现此功能从 unboundedPreceding
的 window 上方至currentRow
.
事情是这样的:
df.withColumn("newColumn", customAccumulator
.apply(columnInputSeq)
.over(customWindowSpec));
但是,出于类型安全原因和通常更干净的代码,我真的更喜欢使用类型化数据集。即:使用 org.apache.spark.sql.expressions.Aggregator
执行相同的操作超过Dataset<CustomType>
。这里的问题是我已经浏览了所有文档,但无法弄清楚如何使其以与上面相同的方式运行(即我只能获得整个列的最终聚合,而不是每行的累积状态) .
我想要做的事情可能吗?如果可能的话,如何做?
为清楚起见添加示例:
初始表:
+-------+------+------+
| Index | Col1 | Col2 |
+-------+------+------+
| 1 | abc | def |
| 2 | ghi | jkl |
| 3 | mno | pqr |
| 4 | stu | vwx |
+-------+------+------+
然后使用示例聚合操作: 首先反转累加器,在 Col1 前面附加 Col2 并返回该值,并将其设置为累加器。
+-------+------+------+--------------------------+
| Index | Col1 | Col2 | Accumulator |
+-------+------+------+--------------------------+
| 1 | abc | def | abcdef |
| 2 | ghi | jkl | ghifedcbajkl |
| 3 | mno | pqr | mnolkjabcdefihgpqr |
| 4 | stu | vwx | sturpqghifedcbajklonmvwx |
+-------+------+------+--------------------------+
使用 UserDefinedAggregateFunction
我已经能够制作这个,但带有 Aggregator
我只能得到最后一行。
最佳答案
你不
我的消息来源是一位 friend ,他一直在研究与此相同的问题,现在得出的结论是这是不可能的
关于java - 如何累积运行 Spark sql 聚合器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57145552/