csv - 在 Flink 中解析 CSV 时,在引用字段内转义引号

标签 csv apache-flink

在 Flink 中,当遇到包含像 "Fazenda São José ""OB"" Airport" 这样的引号的字段时,使用 readCsvFile 解析 CSV 文件会引发异常。 :

org.apache.flink.api.common.io.ParseException: Line could not be parsed: '191,"SDOB","small_airport","Fazenda São José ""OB"" Airport",-21.425199508666992,-46.75429916381836,2585,"SA","BR","BR-SP","Tapiratiba","no","SDOB",,"SDOB",,,'

我在 this 中找到了邮件列表线程和 this JIRA问题,应该通过\字符来实现字段内的引用,但我无法控制数据来修改它。有没有办法解决这个问题?

我也试过使用 ignoreInvalidLines() (这是不太可取的解决方案)但它给了我以下错误:
08:49:05,737 INFO  org.apache.flink.api.common.io.LocatableInputSplitAssigner    - Assigning remote split to host localhost
08:49:05,765 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN DataSource (at main(Job.java:53) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at main(Job.java:54)) -> Combine(SUM(1), at main(Job.java:56) (2/8)
java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.flink.api.common.io.GenericCsvInputFormat.skipFields(GenericCsvInputFormat.java:443)
    at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:412)
    at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
    at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:454)
    at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

这是我的代码:
DataSet<Tuple2<String, Integer>> csvInput = env.readCsvFile("resources/airports.csv")
            .ignoreFirstLine()
            .ignoreInvalidLines()
            .parseQuotedStrings('"')
            .includeFields("100000001")
            .types(String.class, String.class)
            .map((Tuple2<String, String> value) -> new Tuple2<>(value.f1, 1))
            .groupBy(0)
            .sum(1);

最佳答案

如果无法更改输入数据,则应关闭 parseQuotedString() .这将简单地查找下一个字段分隔符并将其间的所有内容作为字符串(包括引号)返回。然后就可以在后续的映射操作中去掉前后引号。

关于csv - 在 Flink 中解析 CSV 时,在引用字段内转义引号,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38300903/

相关文章:

json - Apache Flink:无法从 ObjectNode::get 中提取 key

java - OrderedStreamElementQueue - 潜在的死锁

java - 使用 Java OutputFormat 发出 Scala 元组

java - 窗口未完成其窗口长度

python - 根据 csv 中存储的路径和文本值高效更新 XML 文本内容

python - 尝试解析日志文件并使用 python 中的表创建 CVS 报告。由于某种原因,代码返回空表

linux - 如何根据多个条件将大文件 ".csv"分成小文件?

java - 当构造函数仅被调用一次时,如何使 volatile 字段对于每个线程都是唯一的?

javascript - NodeJS 在引导 csv 文件一段时间后崩溃

将具有多个值的字典列表写入多个文件的Python脚本