我正在 Nifi v 1.3 中编写自定义处理器
处理器执行从结果集中读取的 SQL 查询,并将每一行转换为 json 文档并将其存储到 ArrayList 中,最后它将每 1000 个文档(fetchSize 参数)传输到一个流文件,这对我来说很有效,但它发送了所有一次流文件。
我想要的是当我调用transferFlowFile方法时它独立地传输每个流文件,而不是等待onTrigger方法结束一次传输所有内容。
这里是代码:
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
if (fileToProcess == null && context.hasNonLoopConnection()) {
return;
}
}
final ResultSet resultSet = st.executeQuery();
final ResultSetMetaData meta = resultSet.getMetaData();
final int nrOfColumns = meta.getColumnCount();
List<Map<String, Object>> documentList = new ArrayList<>();
while (resultSet.next()) {
final AtomicLong nrOfRows = new AtomicLong(0L);
cpt++;
Map<String, Object> item = new HashMap<>();
for (int i = 1; i <= nrOfColumns; i++) {
int javaSqlType = meta.getColumnType(i);
String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i)
: meta.getColumnName(i);
Object value = null;
value = resultSet.getObject(i);
if (value != null) {
item.put(nameOrLabel, value.toString());
}
}
Document document = new Document(item);
documentList.add(document);
if (fetchSize!=0 && cpt % fetchSize == 0) {
FlowFile flowFile = session.create();
transferFlowFile(flowFile, session, documentList, fileToProcess, nrOfRows, stopWatch);
}
}
if (!documentList.isEmpty()) {
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile flowFile = session.create();
transferFlowFile(flowFile, session, documentList, fileToProcess, nrOfRows, stopWatch);
}
}
public void transferFlowFile(FlowFile flowFile, ProcessSession session, List<Map<String, Object>> documentList,
FlowFile fileToProcess, AtomicLong nrOfRows, StopWatch stopWatch) {
flowFile = session.write(flowFile, out -> {
ObjectMapper mapper = new ObjectMapper();
IOUtils.write(mapper.writeValueAsBytes(documentList), out);
});
documentList.clear();
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().modifyContent(flowFile, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
}
最佳答案
之后调用session.commit()
session.transfer(flowFile, REL_SUCCESS)
自上次提交以来创建的任何流文件,或者如果从未提交过,则自开始以来创建的任何流文件都将在 session 提交时传输。
关于apache-nifi - 如何使用 NIFI 中的自定义处理器逐一传输流程文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45637403/