apache-nifi - 如何使用 NIFI 中的自定义处理器逐一传输流程文件

标签 apache-nifi

我正在 Nifi v 1.3 中编写自定义处理器

处理器执行从结果集中读取的 SQL 查询,并将每一行转换为 json 文档并将其存储到 ArrayList 中,最后它将每 1000 个文档(fetchSize 参数)传输到一个流文件,这对我来说很有效,但它发送了所有一次流文件。

我想要的是当我调用transferFlowFile方法时它独立地传输每个流文件,而不是等待onTrigger方法结束一次传输所有内容。

这里是代码:

public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile fileToProcess = null;
    if (context.hasIncomingConnection()) {
       fileToProcess = session.get();
       if (fileToProcess == null && context.hasNonLoopConnection()) {
          return;
       }
    }

    final ResultSet resultSet = st.executeQuery();
    final ResultSetMetaData meta = resultSet.getMetaData();
    final int nrOfColumns = meta.getColumnCount();
    List<Map<String, Object>> documentList = new ArrayList<>();

        while (resultSet.next()) {

           final AtomicLong nrOfRows = new AtomicLong(0L);
           cpt++;

           Map<String, Object> item = new HashMap<>();
           for (int i = 1; i <= nrOfColumns; i++) {
               int javaSqlType = meta.getColumnType(i);
               String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i)
                    : meta.getColumnName(i);
               Object value = null;
               value = resultSet.getObject(i);
               if (value != null) {
                  item.put(nameOrLabel, value.toString());
               }
           }
           Document document = new Document(item);
           documentList.add(document);

           if (fetchSize!=0 && cpt % fetchSize == 0) {
            FlowFile flowFile = session.create();
            transferFlowFile(flowFile, session, documentList, fileToProcess, nrOfRows, stopWatch);
           }
       }

       if (!documentList.isEmpty()) {
          final AtomicLong nrOfRows = new AtomicLong(0L);
          FlowFile flowFile = session.create();
          transferFlowFile(flowFile, session, documentList, fileToProcess, nrOfRows, stopWatch);
       }
}


public void transferFlowFile(FlowFile flowFile, ProcessSession session, List<Map<String, Object>> documentList,
        FlowFile fileToProcess, AtomicLong nrOfRows, StopWatch stopWatch) {

    flowFile = session.write(flowFile, out -> {
        ObjectMapper mapper = new ObjectMapper();
        IOUtils.write(mapper.writeValueAsBytes(documentList), out);
    });

    documentList.clear();

    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");

    session.getProvenanceReporter().modifyContent(flowFile, "Retrieved " + nrOfRows.get() + " rows",
            stopWatch.getElapsed(TimeUnit.MILLISECONDS));

    session.transfer(flowFile, REL_SUCCESS);
}

最佳答案

之后调用session.commit()

session.transfer(flowFile, REL_SUCCESS)

自上次提交以来创建的任何流文件,或者如果从未提交过,则自开始以来创建的任何流文件都将在 session 提交时传输。

关于apache-nifi - 如何使用 NIFI 中的自定义处理器逐一传输流程文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45637403/

相关文章:

redis - Nifi自定义处理器如何获取redis的数据库连接并重用该连接

regex - 使用正则表达式从逗号分隔的字符串中提取值

apache-nifi - 什么是删除 nifi 流文件内容的最快方法?

apache-nifi - 清除 FetchDistributedMapCache 处理器的缓存

apache-nifi - 从流文件内容中提取多行内容

java - NIFI : No controller service types found that are applicable for this property

mongodb - 如何在 Nifi getMongo 查询字段中获取 ISO 字符串

configuration - 更改正在运行的 Kubernetes Pod 中的配置

apache-nifi - PostHTTP/InvokeHTTP 处理器的静态 header - NiFi

time - apache nifi 总执行时间