apache-flink - Flink sink filesystem as parquet - 保存嵌套数据时出错

标签 apache-flink parquet flink-sql flink-table-api

我正在尝试将 json 数据转换为 Parquet ,以便我可以使用 Trino 或 presto 进行查询。示例JSON如下:

{"name": "success","message": "test","id": 1, "test1":  {"one": 1, "two":  2, "three":  "t3"}, "test2":  [1,2,3], "test3": [{"a": "a"},{"a": "aa"}], "test4": [{"a": "a"},{"a": "aa"}]}

我的Flink代码如下:

    tEnv.executeSql("create TEMPORARY table test (" +
                "name string," +
                "message string," +
                "id int," +
//                "test1 string," +
                "test1 map<string,string>," +
//                "test1 row (`one` int, `two` int, `three` string)," +
                "test2 array< int >," +
                "test3 array< map<string,string>>," +
                "test4 string" +
                ")" +
//                "ts as LOCALTIMESTAMP," +
//                "WATERMARK FOR ts AS ts - INTERVAL '10' SECOND)" +
                "with (" +
                "'connector' = 'filesystem'," +
                "'path' = 'file:///Users/successmalla/big_data/flink/src/main/resources/test.json'," +
                "'format' = 'json'" +
//                "'csv.ignore-parse-errors' = 'true'" +
                ")");
//
//      tEnv.executeSql("select test1['one'] as test_one, test1 " +
////                        ", test2[1], test3[2]['a']" +
////                        ", * " +
//              "from test ")
//              .print();
//
        tEnv.executeSql("create table test2 (" +
                "name string," +
                "message string," +
                "id int," +
                "test1 map<string,string>," +
//                "test1 row (`one` int, `two` int, `three` string)," +
//                "test2 array< int >," +
//                "test3 array< map<string,string>>," +
                "test4 string" +
                ")" +
//                "ts as LOCALTIMESTAMP," +
//                "WATERMARK FOR ts AS ts - INTERVAL '10' SECOND)" +
                "with (" +
                "'connector' = 'filesystem'," +
                "'path' = 'file:///Users/successmalla/big_data/flink/src/main/resources/testresilt'," +
                "'format' = 'parquet'" +
//                "'csv.ignore-parse-errors' = 'true'" +
                ")");

        tEnv.executeSql("insert into test2 " +
                "select name, message, id, test1, test4 " +
                "from test ");

有了这个我得到了以下错误

Caused by: java.lang.UnsupportedOperationException: Unsupported type: MAP<STRING, STRING>
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:105)
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:43)
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetMessageType(ParquetSchemaConverter.java:37)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:72)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:70)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder.getWriteSupport(ParquetRowDataBuilder.java:67)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:652)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$FlinkParquetBuilder.createWriter(ParquetRowDataBuilder.java:135)
at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)

我可以使用 map 、数组或行打印数据,但我无法将此数据保存为 Parquet 。提前谢谢你。

最佳答案

我继续使用有效的 ORC。 :)

关于apache-flink - Flink sink filesystem as parquet - 保存嵌套数据时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69612199/

相关文章:

hive - Cloudera 5.6 : Parquet does not support date. 见 HIVE-6384

apache-flink - Flink Kafka 连接器 0.10.0 事件时间澄清和 ProcessFunction 澄清

scala - "Stream Processing with Apache Flink"如何从 IntelliJ 运行书籍代码?

apache-spark - 随着分区的增长,spark parquet 写入变慢

apache-spark - 将新列附加到现有的 Parquet 文件

python - "Rowtime attributes must not be in the input rows of a regular join"尽管使用间隔连接,但仅具有事件时间戳

scala - Flink 通用 Avro 解串器 : override getProducedType

apache-flink - 滑动时间窗口的 F​​link 性能问题

java - 如何向 Apache Flink 表添加新行

hadoop - 如何理解 Flink 中的流表?