java - Flink 将 csv 文件映射到元组中

标签 java apache-kafka apache-flink map-function data-stream

我正在尝试将已由 Flink 使用并由 Kafka 生成的 CSV 文件映射到 Tuple4。我的 CSV 文件有 4 列,我想将每一行映射到 Tuple4。问题是我不知道如何实现map()和csv2Tuple函数。

这是我陷入困境的地方:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);

DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
            new SimpleStringSchema(), parameterTool.getProperties()));

DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}

我还想将元组中的项目从字符串解析为整数。

最佳答案

假设您将 csv 文件的每一行生成为 Kafka 消息,并使用 Flink Kafka 连接器使用它,您只需使用 , 分割每条消费的消息(因为它是一个 csv 文件)。

DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
                String[] temp = str.split(",");
                return new Tuple4<>(
                        Integer.parseInt(temp[0]),
                        Integer.parseInt(temp[1]),
                        Integer.parseInt(temp[2]),
                        Integer.parseInt(temp[3])
                );

            }
        });

关于java - Flink 将 csv 文件映射到元组中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52016043/

相关文章:

scala - 如何在scala中使用flink折叠功能

Java 8 Lambda 表达式验证

java - J2EE api 显示 HttpServletResponse 有一个 'getStatus()' 调用。 J2EE jar 文件没有它

maven - 使用架构注册表时出现问题 :download

node.js - 用 react 阅读卡夫卡的主题

hadoop - YARN-指定在哪个Nodemanager上运行哪个应用

java - 如何为JNLP应用程序设置preferIPv4Stack属性?

java - Apache Camel : Reply received for unknown correlationID

node.js - MQTT 到卡夫卡。如何避免重复

apache-flink - 如何使用 Flink Java API 计算 DataStream 中的不同值