概述
我正在使用 Java Spark 计算大量数据。我正在将许多名为 feeders
的 .DAT
文件的内容加载到 Dataset
中。这些 .DAT
文件在其他字段中包含一个时间戳(从 1970 年 1 月 1 日开始的秒数)开始记录。在这里,文件中的每一行都代表一秒钟。当开始记录数据 1546297200(2018 年 12 月 31 日星期一晚上 11:00:00,格林威治标准时间)时,.DAT
文件 的示例如下:
id & deltaRSTs| timestamp|
| |
SQXCXBAXY4P-02,1546297200,825,2065,391
1,0,-8,0 |1546297200|
1,0,-2,0 |1546297201|
1,0,0,0 |1546297202|
1,0,10,0 |1546297203|
1,0,-6,0 |1546297204|
1,0,-4,0 |1546297205|
1,0,0,0 ...
1,0,6,0 ...
1,0,1,0 ...
1,0,-8,0 ...
另一方面,我有另一个数据集
,其中包含有关某些电气设备的信息(.csv
格式)。这里的重要部分是,此类设备创建了一组具有不同时间戳(从 1970 年 1 月 1 日开始跨越的秒数)的事件(让我们用 EVT
来表达它)。
我想从满足特定时间条件的 .DAT
文件中获取所有行:给定一个事件,获取所有 DAT
行,这样EVT(timestamp)
在 DAT(timestamp)
中考虑了创建窗口的偏移量,即:
maxEpoch = DAT(timestamp) + rows_of_DAT
DAT(timestamp) + offset <= EVT(timestamp) && EVT(timestamp) <= maxEpoch - offset
如果您不完全理解这一点,请不要担心,这是为了提供一些背景知识。但是您必须掌握这个想法。
问题
我将介绍我认为适合解决上述案例的类:
类 ReadCSV
(主要):
public class ReadCSV {
private static final SparkSession spark = new SparkSession
.Builder()
.master("local[*]")
.getOrCreate();
public static void main(String[] args) {
spark.sparkContext().setLogLevel("ERROR");
Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs
Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs
Dataset<ProcessEvRow> processEvRowDataset = eventCSVRowDataset
.map(new ProcessEvTransformation(feederFileRowDataset), Encoders.bean(ProcessEvRow.class))
}
}
请注意,在此类中,当创建类型为 ProcessEvTransformation
的对象时,我将所有 DAT
行的 Dataset
作为参数传递.
类ProcessEvTransformation
:
public class ProcessEvTransformation implements MapFunction<EventCSVRow, ProcessEvRow> {
private Dataset<FeederFile> feederFileDataset;
private int offset = 40;
public ProcessEvTransformation(Dataset<FeederFile> feederFileDataset) {
this.feederFileDataset = feederFileDataset;
// I did here, this.feederFileDataset.show(); and it was successfull
}
public ProcessEvTransformation withOffset(int offset) {
this.offset = offset;
return this;
}
@Override
public ProcessEvRow call(EventCSVRow eventCSVRow) throws Exception {
String stdPattern = ...
String rejectedFlag = ...
Dataset<FeederFile> deltaRSTs = this.feederFileDataset
.filter(feederFileRow -> {
final long epochTime = Long.parseLong(eventCSVRow.getEpochTime());
final long maxEpoch = Long.parseLong(feederFileRow.getEpoch()) + feederFileRow.getLineCount();
return Long.parseLong(feederFileRow.getEpoch()) + offset <= epochTime && epochTime <= maxEpoch - offset;
});
String[] rstDistances = getRstDistancesAndMinimum(deltaRSTs, eventCSVRow.getIncrements()); // whatever algorithmic procedure
...
}
}
这里的问题是我得到了一个 NullPointerException
因为 feederFileDataset
属性有点为空。奇怪的是,我很确定它是 100% 定义的,但是当 call
方法被调用时,它变成了 null 或 Invalid tree; null:
(打印时显示的信息)
问题与结论
- 有谁知道如何将
Dataset
作为参数成功传递给基于MapFunction
接口(interface)类? - 为什么
Dataset
在我正确传递时变成了无效的东西?和Java Spark的内部流程有关系吗?
希望我已经说清楚了。感谢您提供的任何帮助。
祝你好运, 托马斯。
最佳答案
由于您必须保留所有数据,因此我建议使用crossJoin
方法。请注意,此方法非常昂贵。
public class ReadCSV {
private static final SparkSession spark = new SparkSession
.Builder()
.master("local[*]")
.getOrCreate();
public static void main(String[] args) {
spark.sparkContext().setLogLevel("ERROR");
Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs
Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs
Dataset<Andamyo> joined = eventCSVRowDataset
.crossJoin(feederFileRowDataset).as(Encoders.bean(Andamyo.class))
.filter(andamyo -> {
final long eventEpoch = Long.parseLong(andamyo.getEventEpoch());
final long maxEpoch = Long.parseLong(andamyo.getFeederEpoch()) + andamyo.getLineCount();
return Long.parseLong(andamyo.getFeederEpoch()) <= eventEpoch && eventEpoch <= maxEpoch;
});
}
}
另一方面,类 Andamyo
表示包含 event
和 feederFileRow
的所有连接信息的行。
输出示例:
+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
|eventEpoch| increments|internalId|phase| deltaRST|feederEpoch|feederId|lineCount| mrtCode|
+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 46|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 47|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 48|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 49|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 50|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 51|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 52|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 53|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 54|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 55|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 56|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 57|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 58|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 59|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 60|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 61|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 62|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 63|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 64|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 65|MRT0000020611|
+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
only showing top 20 rows
root
|-- eventEpoch: string (nullable = true)
|-- increments: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- internalId: string (nullable = true)
|-- phase: string (nullable = true)
|-- deltaRST: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- feederEpoch: string (nullable = true)
|-- feederId: string (nullable = true)
|-- lineCount: long (nullable = true)
|-- mrtCode: string (nullable = true)
检查
应用的偏移量为 0。检查 eventEpoch
是否在 feederEpoch
加上它的行数(通常为 3600 秒)的范围内,如您所知,每一行代表一秒钟。查看如何检查的示例。
for first row:
|1564995646|[2, 2, 2, 75, 33,...| 11| R|[1, 0, 0, 0]| 1564995600| 02| 46|MRT0000020611|
1564995646 >= 1564995600 and 1564995646 <= 1564995600 + 3600 ===> TRUE
关于java - 将 Dataset<T> 传递给基于 MapFunction 接口(interface)的类 Java Spark 时为 null,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59193005/