java - 来自 KafkaConsumer 的 NiFi 流文件属性

标签 java apache-spark apache-kafka kafka-consumer-api apache-nifi

我一直在尝试从 Spark Streaming 中的 Kafka 消息访问 NiFi Flowfile 属性。我使用 Java 作为语言。

场景是 NiFI 使用 GetSFTP 处理器从 FTP 位置读取二进制文件,并使用publishKafka 处理器将 byte[] 消息发布到 Kafka。使用 Spark Streaming 作业将这些 byte[] 属性转换为 ASCII 数据,并将这些解码后的 ASCII 写入 Kafka 进行进一步处理,并使用 NiFi 处理器保存到 HDFS。

我的问题是我无法跟踪二进制文件名和解码的 ASCII 文件。我必须在解码的 ASCII 中添加 header 部分(文件名、文件大小、记录计数等),但我无法弄清楚如何从 KafkaConsumer 对象的 NiFi Flowfile 访问文件名。有没有办法使用标准 NiFi 处理器来做到这一点?或者请分享任何其他建议来实现此功能。谢谢。

最佳答案

所以你的数据流是:

FTP -> NiFi -> Kafka -> Spark Streaming -> Kafka -> NiFi -> HDFS ?

目前 Kafka 在每条消息上都没有元数据属性(尽管我相信这可能会在 Kafka 0.11 中出现),因此当 NiFi 向某个主题发布消息时,它目前无法将流文件属性与消息。

您必须构建某种类型的包装数据格式(可能是 JSON 或 Avro),其中包含原始内容 + 您需要的附加属性,以便您可以将整个内容作为一条消息的内容发布到 Kafka。

另外,我不知道你在 Spark 流媒体工作中到底在做什么,但是你有什么理由不能只在 NiFi 中做这部分吗?听起来并不像涉及窗口或连接的任何复杂内容,因此您可以稍微简化一下事情并让 NiFi 进行解码,然后让 NiFi 将其写入 Kafka 并写入 HDFS。

关于java - 来自 KafkaConsumer 的 NiFi 流文件属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44465320/

相关文章:

Java webapp Filter导致Tomcat启动服务器失败

java - 使用流进行从数组到列表的空安全转换

java - 如何将 AWT 中的类导入到我的 Android 项目中?

python - 使用 pem key 和客户端证书的 KAFKA SSL 连接

authentication - 基于客户端证书 DN 或其部分的 Kafka ACL to topic

apache-kafka - 主题消费率

javascript - kafka Node js 客户端压缩问题与 snappy

C# 等效于具有 init block 的 Java 匿名内部类

java - 如何优化 Spark RDD 上的 groupBy() 操作

java - 无法运行 JAR - 使用 Java 进行 Spark Twitter Streaming