java - 复制数据集中的行并更改值

标签 java apache-spark

我的问题

在我的 Java 应用程序中,我需要解析 DataSet<Row> ,然后根据函数复制一些行,并更改复制行中某一列的值。

编辑:根据 map 功能,行可能会重复 2 或 3 次,每次重复将具有不同的值。

我尝试过的

让我们以DataSet<Row> foo为例(内容已使用 parquet 文件初始化)。现在我要申请foo.flatMap(bar, ??)哪里barFlatMapFunction<Row, Row>返回一个迭代器和 ??是一个我无法理解的字段,因为文档中有关此方法的信息非常少。

迭代器的构造如下:

final List<Row> list = new ArrayList<Row>();
list.add(row);

final Object[] newRow = new Object[row.length()];

for (int i = 0; i<row.length();i++){
    newRow[i] = row.get(i);
}

newRow[row.fieldIndex("fieldName")] = someLong;
list.add(RowFactory.create(newRow));

return list.iterator();

我的问题

我创建迭代器的方式正确吗?

这个神秘的编码器是什么?我应该在这个字段中放入什么?

最佳答案

flatMap() 文档说

Returns a new Dataset by first applying a function to all elements of this Dataset, and then flattening the results.

我认为您应该使用 filter() 来获取仅包含您要复制和修改的行的数据集。之后,使用 foreach() 修改这些行,然后使用 union() 两个数据集。

请注意,我还没有尝试过此操作,因此我不确定 filter() 方法是否创建包含新行的数据集或仅引用原始数据集中的行。如果它不创建新行,则只需将所有过滤后的行符合到新数据集即可。

关于java - 复制数据集中的行并更改值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39621276/

相关文章:

java - Jaxb 使用 "get"而不是 "is"前缀创建 boolean 方法访问器名称

sql - Spark 1.3 中未找到命令策略类

hadoop - 如何在我的Spark Streaming应用程序中移动文件

scala - 为什么sparkcontext停止了?

scala - Spark Structured Streaming DataFrame 上的排序操作

java - `public static <T> void main(String[] args)` 代表什么?

java - Firebase .indexOn 未按预期工作

java - 有一个禁用的onClick?

java - 在构造函数中设置父子关系而不泄漏 "this"变量

apache-spark - 使用spark ml时如何以另一种方式索引分类特征