我试图找到一个可以从 kafka 生成和订阅 avro 消息的示例。
此时,我想使用没有任何融合附加组件的“普通”kafka 部署。
这可能吗?到目前为止,我发现的所有示例都很快开始使用融合的特定工具来处理 avro 消息。
我确信应该有一种方法可以让我在 kafka 平台上发布和使用 avro 消息,并且没有任何“特定于发行版”的插件。
最佳答案
当然,您可以在没有任何 Confluent 工具的情况下做到这一点。但是你必须在你这边做额外的工作(例如在你的应用程序代码中)——这是提供 Avro 相关工具的最初动机,比如你提到的 Confluent 工具。
一种选择是使用 Apache Avro Java API 手动序列化/反序列化 Kafka 消息的有效负载(例如从 YourJavaPojo
到 byte[]
)直接地。 (我想你暗示 Java 是首选的编程语言。)这会是什么样子?这是一个例子。
byte[]
),然后使用 Kafka 的 Java 生产者客户端将编码的有效负载写入 Kafka 主题。 byte[]
到 Java pojo)。 当然,在使用 Kafka Streams(将包含在即将推出的 Apache Kafka 0.10 中)或 Apache Storm 等流处理工具时,您也可以直接使用 Avro API。
最后,您还可以选择使用一些实用程序库(无论是来自 Confluent 还是其他地方),这样您就不必直接使用 Apache Avro API。就其值(value)而言,我在 kafka-storm-starter 上发布了一些稍微复杂的示例。 ,例如如 AvroDecoderBolt.scala 所示.此处,Avro 序列化/反序列化是通过使用 Scala 库 Twitter Bijection 完成的。 .这是
AvroDecoderBolt.scala
的示例片段给你一个大致的想法: // This tells Bijection how to automagically deserialize a Java type `T`,
// given a byte array `byte[]`.
implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
// Let's put Bijection to use.
private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
require(bytes != null, "bytes must not be null")
val decodeTry = Injection.invert(bytes) // <-- deserialization, using Twitter Bijection, happens here
decodeTry match {
case Success(pojo) =>
log.debug("Binary data decoded into pojo: " + pojo)
collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
()
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
}
所以是的,您当然可以选择不使用任何其他库,例如 Confluent 的 Avro 序列化器/反序列化器(目前作为 confluentinc/schema-registry 的一部分提供)或 Twitter's Bijection .是否值得付出额外的努力由您来决定。
关于apache-kafka - 在没有 Confluent 组件的情况下从 Kafka 生成和使用 Avro 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37290303/