我正在尝试将我的应用程序从 flink 流处理转换为 flink 批处理。
对于 flink 数据流,我从具有多个 JSON 对象的预定义文件中读取字符串,并从 Json 对象到 tuple3 收集器进行平面映射(第一个元素 - 来自 json 对象的一个字段,第二个元素 - 来自 json 对象的另一个字段,第三个元素 - 实际的 json 对象数据)。
DataStream<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
ObjectNode record = mapper.readValue(value, ObjectNode.class);
JsonNode customer = record.get("customer");
JsonNode deviceId = record.get("id");
if (customer != null && deviceId != null) {
out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
}
}
});
然后,在窗口内对元组的第一个元素和元素执行 keyBy 操作。
WindowedStream<Tuple3<String, Integer,ObjectNode>, Tuple, TimeWindow> combinedData = transformedSource
.keyBy(0, 1)
.timeWindow(Time.seconds(5));
对于flink批处理,DataSet Batch的KeyBy怎么做,DataSet中有KeyBy的等效方法
DataSet<String> source = env.readTextFile("file:///path /to/ file");
DataSet<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
ObjectNode record = mapper.readValue(value, ObjectNode.class);
JsonNode customer = record.get("customer");
JsonNode deviceId = record.get("id");
if (customer != null && deviceId != null) {
out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
}
}
});
最佳答案
groupBy似乎是您正在寻找的方法
关于apache-flink - 如何对数据集批处理的第一个元组字段元素进行 keyBy,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40668028/