apache-flink - 如何对数据集批处理的第一个元组字段元素进行 keyBy

标签 apache-flink

我正在尝试将我的应用程序从 flink 流处理转换为 flink 批处理。

对于 flink 数据流,我从具有多个 JSON 对象的预定义文件中读取字符串,并从 Json 对象到 tuple3 收集器进行平面映射(第一个元素 - 来自 json 对象的一个​​字段,第二个元素 - 来自 json 对象的另一个字段,第三个元素 - 实际的 json 对象数据)。

DataStream<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
                @Override
                public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
                    ObjectNode record = mapper.readValue(value, ObjectNode.class);
                    JsonNode customer = record.get("customer");
                    JsonNode deviceId = record.get("id");
                                       if (customer != null && deviceId != null) {
                        out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
                    }
                }
            });

然后,在窗口内对元组的第一个元素和元素执行 keyBy 操作。

WindowedStream<Tuple3<String, Integer,ObjectNode>, Tuple, TimeWindow> combinedData = transformedSource
            .keyBy(0, 1)
            .timeWindow(Time.seconds(5));

对于flink批处理,DataSet Batch的KeyBy怎么做,DataSet中有KeyBy的等效方法

DataSet<String> source = env.readTextFile("file:///path /to/ file");


DataSet<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
                @Override
                public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
                    ObjectNode record = mapper.readValue(value, ObjectNode.class);
                    JsonNode customer = record.get("customer");
                    JsonNode deviceId = record.get("id");
                                       if (customer != null && deviceId != null) {
                        out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
                    }
                }
            });

最佳答案

groupBy似乎是您正在寻找的方法

关于apache-flink - 如何对数据集批处理的第一个元组字段元素进行 keyBy,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40668028/

相关文章:

elasticsearch - 单个flink管道的多个Elasticsearch接收器

apache-flink - Apache Flink : Enrich stream with data from external/blocking call

memory - Spark vs Flink 可用内存不足

java - Apache 弗林克 : can't use writeAsCsv() with a datastream of subclass tuple

serialization - Flink流: Unexpected charaters in serialized String messages

apache-flink - 在 Apache flink 中的节点之间共享数据集的最佳方式是什么?

elasticsearch - 使用Flink Rich InputFormat创建Elasticsearch的输入格式

apache-flink - Flink Jdbc 接收器

java - 使用 Java 通过 Apache Flink 减少 Pojo 字段

kubernetes - 超时尝试为检查点作业启动 flink 作业主机