java - 将 Dataset<T> 传递给基于 MapFunction 接口(interface)的类 Java Spark 时为 null

标签 java apache-spark apache-spark-sql

概述

我正在使用 Java Spark 计算大量数据。我正在将许多名为 feeders.DAT 文件的内容加载到 Dataset 中。这些 .DAT 文件在其他字段中包含一个时间戳(从 1970 年 1 月 1 日开始的秒数)开始记录。在这里,文件中的每一行都代表一秒钟。当开始记录数据 1546297200(2018 年 12 月 31 日星期一晚上 11:00:00,格林威治标准时间)时,.DAT 文件 的示例如下:


id & deltaRSTs| timestamp| 
              |          |
SQXCXBAXY4P-02,1546297200,825,2065,391
1,0,-8,0      |1546297200|
1,0,-2,0      |1546297201|
1,0,0,0       |1546297202|
1,0,10,0      |1546297203|
1,0,-6,0      |1546297204|
1,0,-4,0      |1546297205|
1,0,0,0       ... 
1,0,6,0       ...
1,0,1,0       ...
1,0,-8,0      ...

另一方面,我有另一个数据集,其中包含有关某些电气设备的信息(.csv 格式)。这里的重要部分是,此类设备创建了一组具有不同时间戳(从 1970 年 1 月 1 日开始跨越的秒数)的事件(让我们用 EVT 来表达它)。

我想从满足特定时间条件的 .DAT 文件中获取所有行:给定一个事件,获取所有 DAT 行,这样EVT(timestamp)DAT(timestamp) 中考虑了创建窗口的偏移量,即:

maxEpoch = DAT(timestamp) + rows_of_DAT
DAT(timestamp) + offset <= EVT(timestamp) && EVT(timestamp) <= maxEpoch - offset

如果您不完全理解这一点,请不要担心,这是为了提供一些背景知识。但是您必须掌握这个想法。

问题

我将介绍我认为适合解决上述案例的类:

ReadCSV(主要):

public class ReadCSV {

    private static final SparkSession spark = new SparkSession
            .Builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {

        spark.sparkContext().setLogLevel("ERROR");

        Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs

        Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs

        Dataset<ProcessEvRow> processEvRowDataset = eventCSVRowDataset
            .map(new ProcessEvTransformation(feederFileRowDataset), Encoders.bean(ProcessEvRow.class))

    }
}

请注意,在此类中,当创建类型为 ProcessEvTransformation 的对象时,我将所有 DAT 行的 Dataset 作为参数传递.

ProcessEvTransformation:

public class ProcessEvTransformation implements MapFunction<EventCSVRow, ProcessEvRow> {

    private Dataset<FeederFile> feederFileDataset;
    private int offset = 40;

    public ProcessEvTransformation(Dataset<FeederFile> feederFileDataset) {
        this.feederFileDataset = feederFileDataset;
        // I did here, this.feederFileDataset.show(); and it was successfull
    }

    public ProcessEvTransformation withOffset(int offset) {
        this.offset = offset;
        return this;
    }

    @Override
    public ProcessEvRow call(EventCSVRow eventCSVRow) throws Exception {
        String stdPattern = ...
        String rejectedFlag = ...
        Dataset<FeederFile> deltaRSTs = this.feederFileDataset
                .filter(feederFileRow -> {
                    final long epochTime = Long.parseLong(eventCSVRow.getEpochTime());
                    final long maxEpoch = Long.parseLong(feederFileRow.getEpoch()) + feederFileRow.getLineCount();
                    return Long.parseLong(feederFileRow.getEpoch()) + offset <= epochTime && epochTime <= maxEpoch - offset;
                });

        String[] rstDistances = getRstDistancesAndMinimum(deltaRSTs, eventCSVRow.getIncrements()); // whatever algorithmic procedure
        ...
    }
}

这里的问题是我得到了一个 NullPointerException 因为 feederFileDataset 属性有点为空。奇怪的是,我很确定它是 100% 定义的,但是当 call 方法被调用时,它变成了 null 或 Invalid tree; null:(打印时显示的信息)

问题与结论

  • 有谁知道如何将 Dataset 作为参数成功传递给基于 MapFunction 接口(interface)类?
  • 为什么 Dataset 在我正确传递时变成了无效的东西?和Java Spark的内部流程有关系吗?

希望我已经说清楚了。感谢您提供的任何帮助。

祝你好运, 托马斯。

最佳答案

由于您必须保留所有数据,因此我建议使用crossJoin 方法。请注意,此方法非常昂贵。

public class ReadCSV {

    private static final SparkSession spark = new SparkSession
            .Builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {

        spark.sparkContext().setLogLevel("ERROR");

        Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs

        Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs

        Dataset<Andamyo> joined = eventCSVRowDataset
            .crossJoin(feederFileRowDataset).as(Encoders.bean(Andamyo.class))
            .filter(andamyo -> {
                final long eventEpoch = Long.parseLong(andamyo.getEventEpoch());
                final long maxEpoch = Long.parseLong(andamyo.getFeederEpoch()) + andamyo.getLineCount();
                return Long.parseLong(andamyo.getFeederEpoch()) <= eventEpoch && eventEpoch <= maxEpoch;
            });

    }
}

另一方面,类 Andamyo 表示包含 eventfeederFileRow 的所有连接信息的行。

输出示例:

+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
|eventEpoch|          increments|internalId|phase|    deltaRST|feederEpoch|feederId|lineCount|      mrtCode|
+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       46|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       47|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       48|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       49|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       50|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       51|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       52|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       53|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       54|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       55|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       56|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       57|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       58|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       59|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       60|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       61|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       62|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       63|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       64|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       65|MRT0000020611|
+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
only showing top 20 rows

root
 |-- eventEpoch: string (nullable = true)
 |-- increments: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- internalId: string (nullable = true)
 |-- phase: string (nullable = true)
 |-- deltaRST: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- feederEpoch: string (nullable = true)
 |-- feederId: string (nullable = true)
 |-- lineCount: long (nullable = true)
 |-- mrtCode: string (nullable = true)

检查

应用的偏移量为 0。检查 eventEpoch 是否在 feederEpoch 加上它的行数(通常为 3600 秒)的范围内,如您所知,每一行代表一秒钟。查看如何检查的示例。

for first row:
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       46|MRT0000020611|

1564995646 >= 1564995600 and 1564995646 <= 1564995600 + 3600 ===> TRUE

关于java - 将 Dataset<T> 传递给基于 MapFunction 接口(interface)的类 Java Spark 时为 null,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59193005/

相关文章:

apache-spark - 如何在 pyspark 数据帧读取方法中包含分区列

java - DocumentsProvider <provider> 标签可以在针对 API < 19 的 Android list 中使用吗?

java - 如何创建 RomanNumeral 对象并将其转换回 int、long 等?

java - 使用 Spark 展平嵌套 json 文档并加载到 Elasticsearch 中

apache-spark - 如何在一列上聚合并在 pyspark 中最大限度地利用其他列?

scala - Spark Windowsspec 滞后函数计算累积分数

java - 用另一种方法从textview中获取值

Java:@SuppressWarnings(...) 参数的完整列表(在 Netbeans 中)?

apache-spark - Spark 与 Dask 的容错

java - 如何避免在spark saveAsHadoopFile()中生成.crc文件?