目前,我正在开发一个项目,其中有一个 CSV 文件需要在“流处理”之前进行预处理。因此,我需要执行批处理和流处理。具体来说,我的 data.csv 文件需要在特定字段上进行预处理和排序,该字段将用作流处理的 EventTime 时间戳。下面的批处理脚本生成预处理的输出:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Long, String, String>> compactData = env
.readTextFile("data.csv")
.flatMap(new RichFlatMapFunction<String, Tuple3<Long, String, String>>() {
private CustomDelegate delegate;
@Override
public void open(Configuration parameters) throws Exception {
delegate = new CustomDelegate();
}
@Override
public void flatMap(String s, Collector<Tuple3<Long, String, String>> out)
throws Exception {
Tuple3<Long, String, String> datum = delegate.deserializeRide(s);
if (datum != null)
out.collect(datum);
}
});
compactData.partitionByRange(0)
.sortPartition(0, Order.ASCENDING)
.writeAsCsv("output_dir", "\n", ",");
env.execute();
我的默认并行度为 32,当批处理脚本(上面)完成执行时,将创建 output_dir
目录,其中包含 32 个文件。
问题1:我的问题是这些文件是否是根据全局顺序生成的。本质上,文件 1 中的记录是否比文件 2 中的记录具有更小的值(依此类推)?如果不是,我如何保证以前的或同等的东西?
正如我上面提到的,我使用 output_dir
中的文件作为流处理作业的输入,该作业由前一个字段的第一个字段(即 EventTime
)。流作业的代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Phase 0: Collect input and create timestamps
DataStream<Tuple3<Long, String, Integer>> dataStream = env
.readTextFile("output_dir")
.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
@Override
public Tuple3<Long, String, Integer> map(String s) throws Exception {
String[] tokens = s.split(",");
return new Tuple3<Long, String, Integer>(Long.parseLong(tokens[0]),
tokens[1] + "-" + tokens[2], 1);
}
})
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> t) {
return t.f0;
}
});
env.execute();
问题2:我将目录output_dir
定义为输入,其中包含按字段0排序的文件。记录是否会根据以下条件解析并放置在数据流上:我想要的顺序(即在其字段 0 上)。如果不是,我相信我在分配时间戳时会遇到问题(对吗?)?我可以从多个文件中读取 DataStream(就像我现在所做的那样),还是必须将所有文件合并为一个,并通过从单个文件中连续读取所有记录来创建 DataStream?
最佳答案
Question 1: My question is whether those files are produced based on the global order. In essence, do the records in file 1 have smaller values compared to the records in file 2 (and so on.)? If no, how can I guarantee the previous or something equivalent?
没有。由于有32个分区,因此每个分区中的数据都是有序的。但不保证不同输出文件之间的数据顺序。您可以手动将 sortPartition 运算符的并行度设置为 1 或实现自己的 Partitioner,而不是哈希分区器。
Question 2: I define as input the directory output_dir, which contains the files sorted on field 0. Will the records be parsed and placed on the data stream based on the ordering that I want (i.e., on their field 0). If no, I believe that I will have problems with assigning timestamps (right?)? Can I have the DataStream be read from multiple files (as I do now), or do I have to combine all files into one, and create the DataStream by reading all records serially from a single file?
假设有32个输出文件,如果你的流作业的并行度也是32,那么每个文件将被一个并行度消耗,来自该输入文件的所有数据将根据出现在数据中的顺序进行处理文件当前的并行度。但是,一旦您尝试聚合 32 个并行度的数据或尝试打乱数据,数据的顺序就不再排序。如果您希望接收方对数据进行全局排序,您可能必须将所有数据放入一个文件中,并使用具有一种并行度的流作业来处理它们。
关于apache-flink - Apache 弗林克 : Sorting dataset and creating DataStream from multiple input files with Event times,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44166838/