java - Apache Nifi AbstractProcessor 的 onTrigger() 方法中的零流文件

标签 java apache-nifi

我正在为 Apache NiFi 开发自定义处理器。我已经创建了我的处理器的nar并将其放入nifi的lib文件夹中并启动了nifi。我已经在 Eclipse 中设置了远程调试器,并在 onTrigger() 第一行启用了断点。调试时,我在 nifi 管道中一次运行一个处理器。我可以在自定义处理器的输入队列中找到单个流文件,但是我的自定义处理器未接收任何流文件。当我启动自定义处理器时,它会在 onTrigger() 方法内遇到断点。在这个方法内部,当我这样做时:

public class MyCustomProc extends AbstractProcessor {

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        List<FlowFile> flowFiles = session.get(5000);
        if (flowFiles == null || flowFiles.size() == 0) {
            return;
        }
        //...

flowFiles 结果大小为零!!! 我无法猜测应该朝哪个方向检查以找出发生这种情况的原因。有什么提示我可以诊断这个吗?

编辑

堆栈跟踪

2019-05-02 18:08:09,456 ERROR [Timer-Driven Process Thread-10] c.c.product.module.submodule.MyCustomProcessor MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified; rolling back session: {}
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified
    at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
    at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

PS1:此方法立即从 if 的主体内部返回,这给了我以下异常:

org.apache.nifi.processor.exception.FlowFileHandlingException:未指定StandardFlowFileRecord传输关系

由于流文件位于我的自定义处理器的输入队列中,因此此异常会永远重复出现。

PS2:我在 apps.log 中收到以下错误,但我不确定这是否是问题的根源:

2019-05-02 18:17:32,394 ERROR [Timer-Driven Process Thread-4] o.a.nifi.groups.StandardProcessGroup Unable to synchronize StandardProcessGroup[identifier=d25747e6-719e-3ed9-c6c5-56794af6555c] with Flow Registry because Process Group was placed under Version Control using Flow Registry with identifier 80016ab0-bfab-152b-ffff-ffffc441867c but cannot find any Flow Registry with this identifier

最佳答案

有时获得零流量文件是正常行为,这就是处理器在开始时进行检查的原因。

FlowFileHandlingException 意味着从 session 中获取了一个流文件(无论是通过 get 还是 create),并且该流文件没有转移到任何地方也没有被删除,所以基本上它是下落不明的。仅在 if 语句的开头返回不会发生这种情况,因此处理器代码的其余部分正在执行并产生此错误。您尚未提供其余代码,因此我们看不到问题。

第二个问题是相当不言自明的。您有一个受版本控制的进程组,但用于启动版本控制的注册表客户端不知何故不再存在。我不知道您是如何创建此场景的,因为我相信 UI/API 不会让您删除在版本控制下具有 Activity 流的注册表客户端,但您应该能够停止进程组上的版本控制。

关于java - Apache Nifi AbstractProcessor 的 onTrigger() 方法中的零流文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55953174/

相关文章:

java - java jar 中的路径问题

java - 从 DOM 中的 XML 文件中删除数据?

java - gl.glClearColor 有什么作用?

apache-nifi - 如何等待GenerateTableFetch查询完成

java - NiFi : Send flowFile @OnflowFile

apache-nifi - Apache Beam 和 Apache Nifi 的区别

java - 项目编译后 map 不显示

java - 如何根据传递的类型调用枚举方法?

apache-nifi - 集群NIFI,只有一个节点在工作

java - Apache NiFi - NullPointerException 在自定义处理器上设置多个线程