我们有多个(50 多个)nifi 流,它们基本上都做同样的事情:从数据库中提取一些数据,将一些列附加到 parquet 并上传到 hdfs。它们仅在细节上有所不同,例如要运行的 sql 查询或它们在 hdfs 中的位置。
问题是如何分解这些公共(public) nifi 流,以便对公共(public)流所做的任何更改自动应用于所有派生流。例如,如果我想添加一个额外的步骤来将数据也发布到 Kafka,我想做一次并让它自动应用于所有 50 个流。
我们试图让它与 nifi registry 一起工作,但它似乎不太适合。从本质上讲,问题在于 nifi 注册表似乎很适合在一个环境(比如 wat)中更新流程,然后在另一个环境(比如 prod)中自动更新它。它似乎不太适合在同一环境中使用一个特定示例更新多个流,每次我们重新部署时它都会将每个流的名称重置为模板名称,这意味着所有流最终都具有相同的名称!
有谁知道应该如何处理像我们这样的情况,因为我猜这一定很常见。
最佳答案
Apache NiFi 有 ProcessorGroups
。顾名思义,处理器组用于将一组处理器及其执行类似任务的管道组合在一起。
因此,对于您的情况,您可以通过将可与不同管道重复使用的公共(public)流程移动到具有输入端口的单独处理器组来重构流程。通过连接到可重用处理器组的输入端口来连接依赖于此可重用流的外部流。根据您的要求,您也可以在此处理器组中创建一个输出端口并将其与外部流连接。
附加示例:
为了便于解释,我制作了一个模拟流程,因此请忽略使用的 Processor
类型,而是查看我为这些处理器指定的名称。
以下屏幕截图显示我从两个不同的来源读取并将它们分别连接到两个不同的处理器,这些处理器对这些处理器进行特定于源的更改
然后,我将这两个流连接到内部具有可重用流的处理器组的输入端口。因此,最终上面屏幕截图中显示的两个不同流程可以使用一个通用的可重用流程。
显示可重用流程中的内容:
最后输出端口output to outside
将可重用流连接到外部组件Write to somewehere
我希望这可以帮助您重构复杂的流程。如果您有任何疑问,请随时回来。
关于apache-nifi - nifi 的多个流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49789401/