我一直在尝试从 Spark Streaming 中的 Kafka 消息访问 NiFi Flowfile 属性。我使用 Java 作为语言。
场景是 NiFI 使用 GetSFTP 处理器从 FTP 位置读取二进制文件,并使用publishKafka 处理器将 byte[] 消息发布到 Kafka。使用 Spark Streaming 作业将这些 byte[] 属性转换为 ASCII 数据,并将这些解码后的 ASCII 写入 Kafka 进行进一步处理,并使用 NiFi 处理器保存到 HDFS。
我的问题是我无法跟踪二进制文件名和解码的 ASCII 文件。我必须在解码的 ASCII 中添加 header 部分(文件名、文件大小、记录计数等),但我无法弄清楚如何从 KafkaConsumer 对象的 NiFi Flowfile 访问文件名。有没有办法使用标准 NiFi 处理器来做到这一点?或者请分享任何其他建议来实现此功能。谢谢。
最佳答案
所以你的数据流是:
FTP -> NiFi -> Kafka -> Spark Streaming -> Kafka -> NiFi -> HDFS ?
目前 Kafka 在每条消息上都没有元数据属性(尽管我相信这可能会在 Kafka 0.11 中出现),因此当 NiFi 向某个主题发布消息时,它目前无法将流文件属性与消息。
您必须构建某种类型的包装数据格式(可能是 JSON 或 Avro),其中包含原始内容 + 您需要的附加属性,以便您可以将整个内容作为一条消息的内容发布到 Kafka。
另外,我不知道你在 Spark 流媒体工作中到底在做什么,但是你有什么理由不能只在 NiFi 中做这部分吗?听起来并不像涉及窗口或连接的任何复杂内容,因此您可以稍微简化一下事情并让 NiFi 进行解码,然后让 NiFi 将其写入 Kafka 并写入 HDFS。
关于java - 来自 KafkaConsumer 的 NiFi 流文件属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44465320/