我正在使用 Jackson 在 Hadoop 中实现一个 JSON RecordReader。 到目前为止,我正在使用 JUnit + MRUnit 在本地进行测试。 每个 JSON 文件包含一个对象,在一些 header 之后,它有一个字段,其值是一个条目数组,我希望将每个条目理解为一个记录(因此我需要跳过这些 header )。
我可以通过将 FSDataInputStream 推进到读取点来做到这一点。 在我的本地测试中,我执行以下操作:
fs = FileSystem.get(new Configuration());
in = fs.open(new Path(filename));
long offset = getOffset(in, "HEADER_START_HERE");
in.seek(offset);
其中 getOffset 是一个函数,它指向字段值开始的 InputStream - 如果我们查看 in.getPos()
值,它就可以正常工作。
我正在阅读第一条记录:
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readValue (in, JsonNode.class);
第一条记录正常返回。我可以使用 mapper.writeValueAsString(actualObj)
,它读起来很好,而且有效。
到此为止。
所以我尝试迭代对象,方法是:
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = null;
do {
actualObj = mapper.readValue (in, JsonNode.class);
if( actualObj != null) {
LOG.info("ELEMENT:\n" + mapper.writeValueAsString(actualObj) );
}
} while (actualObj != null) ;
它读取第一个,但随后中断:
java.lang.NullPointerException: null
at org.apache.hadoop.fs.BufferedFSInputStream.getPos(BufferedFSInputStream.java:54)
at org.apache.hadoop.fs.FSDataInputStream.getPos(FSDataInputStream.java:57)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:243)
at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:273)
at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:225)
at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:193)
at java.io.DataInputStream.read(DataInputStream.java:132)
at org.codehaus.jackson.impl.ByteSourceBootstrapper.ensureLoaded(ByteSourceBootstrapper.java:340)
at org.codehaus.jackson.impl.ByteSourceBootstrapper.detectEncoding(ByteSourceBootstrapper.java:116)
at org.codehaus.jackson.impl.ByteSourceBootstrapper.constructParser(ByteSourceBootstrapper.java:197)
at org.codehaus.jackson.JsonFactory._createJsonParser(JsonFactory.java:503)
at org.codehaus.jackson.JsonFactory.createJsonParser(JsonFactory.java:365)
at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1158)
为什么会出现这个异常?
这与在本地阅读有关吗?
重用 ObjectMapper
或其底层流时是否需要某种重置或其他方式?
最佳答案
我设法解决了这个问题。如果有帮助:
首先,我使用的是 Jackson 1.x 最新版本。
似乎一旦 JsonParser
被 InputStream
实例化,它就会控制它。
因此,当使用 readValue()
时,一旦它被读取(在内部调用 _readMapAndClose()
会自动关闭流。
您可以设置一个设置来告诉 JsonParser
不要关闭底层流。在创建 JsonParser
之前,您可以像这样将它传递给您的 JsonFactory
:
JsonFactory f = new MappingJsonFactory();
f.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
请注意您负责关闭流(在我的例子中是 FSDataInputStream)。 所以,答案:
- 为什么会出现这种异常?
因为解析器管理流,并在 readValue() 之后关闭它。
- 这与在本地阅读有关吗?
没有
- 在重用 ObjectMapper 或其底层流时是否需要某种重置或其他方式?
没有。在将 Streaming API 与类似 ObjectMapper 的方法混合使用时,您需要注意的是,有时映射器/解析器可能会控制底层流。引用 JsonParser 的 Javadoc并查看有关每种阅读方法的文档以满足您的需求。
关于java - Hadoop + Jackson 解析: ObjectMapper reads Object and then breaks,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26803026/