我目前正在 Nifi 中从 FTP 获取文件,但在获取文件之前我必须检查一些条件。场景是这样的。
列出 FTP -> 检查条件 -> 获取 FTP
在“检查条件”部分中,我从数据库中获取了一些值并与文件名进行比较。那么我可以使用 update 属性从数据库中获取一些记录并使其像这样吗?
列出 FTP -> 更新属性(从数据库) -> 根据属性进行路由 -> 获取 FTP
最佳答案
我认为您的流程如下所示
流量:
1.ListFTP //to list the files
2.ExecuteSQL //to execute query in db(sample query:select max(timestamp) db_time from table)
3.ConvertAvroToJson //convert the result of executesql to json format
4.EvaluateJsonPath //keep destination as FlowfileAttribute and add new property as db_time as $.db_time
5.ROuteOnAttribute //perform check filename timestamp vs extracted timestamp by using nifi expresson language
6.FetchFile //if condition is true then fetch the file
RouteOnAttribute 配置:
我假设文件名类似于fn_2017-08-2012:09:10并且executesql已返回2017-08-2012:08:10
表达式:
${filename:substringAfter('_'):toDate("yyyy-MM-ddHH:mm:ss"):toNumber()
:gt(${db_time:toDate("yyyy-MM-ddHH:mm:ss"):toNumber()})}
通过使用上面的表达式,我们的文件名值与ListFTP
文件名相同,并且使用EvaluateJsonPath
处理器添加了db_time
属性,并且我们正在更改时间戳到数字然后进行比较。
引用this有关 NiFi 表达式语言的更多详细信息的链接。
关于apache-nifi - Nifi从数据库添加属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51940731/