java - nifi自定义处理器中的 session 未关闭异常

标签 java apache-nifi

想要获取文件并使用它的 xml 内容,然后更新其中一个标签,但更清楚地出了问题我的代码应该更改什么?

flowFile = session.putAttribute(flowFile,"filename",file.getName() + ".xml");
DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = builderFactory.newDocumentBuilder();
Document xmlDocument = builder.parse(ffStream);
ffStream.close();
XPath xPath = XPathFactory.newInstance().newXPath();
NodeList myNodeList = (NodeList) xPath.compile("//runAs/text()")
    .evaluate(flowFile, XPathConstants.NODESET);myNodeList.item(0).setNodeValue("false");
FlowFile flowFile2=session.create();
flowFile = session.putAttribute(flowFile2,"filename","config" + ".xml");
session.write(flowFile2, new OutputStreamCallback() {
    @Override
    public void process(final OutputStream out) throws IOException {
        TransformerFactory transformerFactory = TransformerFactory.newInstance();
        Transformer transformer = null;
        try {
            transformer = transformerFactory.newTransformer();
        } catch (TransformerConfigurationException e) {
            e.printStackTrace();
        }
        DOMSource source = new DOMSource(xmlDocument);
        ByteArrayOutputStream bos=new ByteArrayOutputStream();
        StreamResult result=new StreamResult(bos);
        try {
            transformer.transform(source, result);
        } catch (TransformerException e) {
            e.printStackTrace();
        }
        byte []array=bos.toByteArray();
        out.write(array);
    }
});

最佳答案

您的代码没有显示 ffStream 的创建或初始化,您使用的是 ffStream = session.read(flowFile) 吗?如果是这样,一旦您调用 putAttribute(),您为以前版本的流文件打开的输入流就不再有效,因为您通过调用 putAttribute() 创建了一个更新的版本。

在那之后你也有一些问题。例如,您最终创建了一个新的流文件,但随后在 session.write() 之后使用原始流文件的指针/变量来引用它。这肯定会导致问题。如果要覆盖传入的流文件,请将 session.write() 与 StreamCallback 而不是 OutputStreamCallback 一起使用。如果您想保留传入的流文件并创建一个新流文件,我建议为每个流文件(flowFile 和 flowFile2)维护一个变量,并在每次更新该文件时更新该变量(通过 session.write、session.putAttribute、等等)

关于java - nifi自定义处理器中的 session 未关闭异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46570210/

相关文章:

java - kafka 0.8.2.2有spring支持吗?

java - 键盘输入的最高性能

java - 泛型类的静态成员是否因 Java 中的不同类型而不同?

java - NoClassDefFound错误: org/apache/nifi/registry/VariableRegistry while running nifi test cases

java - 启用身份验证后 NiFi web ui 打不开

mysql - PutElasticsearchHttpRecord : Invalid char between encapsulated token and delimiter

hadoop - Apache NiFi/Hive-将合并的推文存储在HDFS中,在Hive中创建表

java - 返回三角形类型,无法识别标识符

java - Mapbox 无法在 fragment 中工作

docker - NiFi集群Docker负载均衡配置