java - 如何将 Dataset<Tuple2<String,DeviceData>> 转换为 Iterator<DeviceData>

标签 java apache-spark apache-spark-2.0 apache-spark-dataset

我有 Dataset<Tuple2<String,DeviceData>>并想将其转换为 Iterator<DeviceData> .

下面是我使用的代码 collectAsList()方法然后得到Iterator<DeviceData> .

Dataset<Tuple2<String,DeviceData>> ds = ...;
List<Tuple2<String, DeviceData>> listTuple = ds.collectAsList();

ArrayList<DeviceData> myDataList = new ArrayList<DeviceData>();
for(Tuple2<String, DeviceData> tuple : listTuple){
    myDataList.add(tuple._2());
}

Iterator<DeviceData> myitr = myDataList.iterator();

我无法使用 collectAsList()因为我的数据很大并且会影响性​​能。我查看了数据集 API,但找不到任何解决方案。我用谷歌搜索但找不到任何答案。有人可以指导我吗?如果解决方案是在 Java 中,那就太好了。谢谢。

编辑:
DeviceData类是简单的javabean。这是 ds 的 printSchema() 输出。
root
 |-- value: string (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- deviceData: string (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- sNo: integer (nullable = true)

最佳答案

您可以直接提取DeviceData来自 ds而不是再次收集和 build 。

java :

Function<Tuple2<String, DeviceData>, DeviceData> mapDeviceData =
    new Function<Tuple2<String, DeviceData>, DeviceData>() {
      public DeviceData call(Tuple2<String, DeviceData> tuple) {
        return tuple._2();
      }
    };

Dataset<DeviceData> ddDS = ds.map(mapDeviceData) //extracts DeviceData from each record

斯卡拉:
val ddDS = ds.map(_._2) //ds.map(row => row._2)

关于java - 如何将 Dataset<Tuple2<String,DeviceData>> 转换为 Iterator<DeviceData>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42456409/

相关文章:

java - 如何检查和验证字符串索引的多个字符?

javascript - 我想在输入数据本身时显示错误消息(在键盘上)

hadoop - 我可以在Spark中创建序列文件吗?

apache-spark - 加载包时禁止来自 spark-submit 的消息

python - 在pyspark中加入具有相同列名的数据框

java - 在sparkSession上注册两个具有相同名称的udf

java - ant-contrib:过时的问题

java - 从java获取接口(interface)NAME

apache-spark - 如何将DataFrame的Spark sql表达式中的空值写入数据库表? (非法参数异常 : Can't get JDBC type for null)

python-3.x - "PythonAccumulatorV2 does not exist"- 在 Jupyter Notebook 中运行 SparkContext() 时