apache-kafka - 在没有 Confluent 组件的情况下从 Kafka 生成和使用 Avro 消息

标签 apache-kafka kafka-consumer-api kafka-producer-api

我试图找到一个可以从 kafka 生成和订阅 avro 消息的示例。

此时,我想使用没有任何融合附加组件的“普通”kafka 部署。

这可能吗?到目前为止,我发现的所有示例都很快开始使用融合的特定工具来处理 avro 消息。

我确信应该有一种方法可以让我在 kafka 平台上发布和使用 avro 消息,并且没有任何“特定于发行版”的插件。

最佳答案

当然,您可以在没有任何 Confluent 工具的情况下做到这一点。但是你必须在你这边做额外的工作(例如在你的应用程序代码中)——这是提供 Avro 相关工具的最初动机,比如你提到的 Confluent 工具。

一种选择是使用 Apache Avro Java API 手动序列化/反序列化 Kafka 消息的有效负载(例如从 YourJavaPojobyte[])直接地。 (我想你暗示 Java 是首选的编程语言。)这会是什么样子?这是一个例子。

  • 首先,您将在将数据写入 Kafka 的应用程序中手动序列化数据负载。在这里,您可以使用 Avro 序列化 API 对有效负载进行编码(从 Java pojo 到 byte[] ),然后使用 Kafka 的 Java 生产者客户端将编码的有效负载写入 Kafka 主题。
  • 然后,在数据管道的下游,您将在另一个从 Kafka 读取数据的应用程序中反序列化。在这里,您可以使用 Kafka 的 Java 消费者客户端从同一 Kafka 主题读取(编码)数据,并使用 Avro 反序列化 API 再次解码有效负载(从 byte[] 到 Java pojo)。

  • 当然,在使用 Kafka Streams(将包含在即将推出的 Apache Kafka 0.10 中)或 Apache Storm 等流处理工具时,您也可以直接使用 Avro API。

    最后,您还可以选择使用一些实用程序库(无论是来自 Confluent 还是其他地方),这样您就不必直接使用 Apache Avro API。就其值(value)而言,我在 kafka-storm-starter 上发布了一些稍微复杂的示例。 ,例如如 AvroDecoderBolt.scala 所示.此处,Avro 序列化/反序列化是通过使用 Scala 库 Twitter Bijection 完成的。 .这是 AvroDecoderBolt.scala 的示例片段给你一个大致的想法:
      // This tells Bijection how to automagically deserialize a Java type `T`,
      // given a byte array `byte[]`.
      implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
    SpecificAvroCodecs.toBinary[T]
    
      // Let's put Bijection to use.
      private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
        require(bytes != null, "bytes must not be null")
        val decodeTry = Injection.invert(bytes)  // <-- deserialization, using Twitter Bijection, happens here
        decodeTry match {
          case Success(pojo) =>
            log.debug("Binary data decoded into pojo: " + pojo)
            collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
            ()
          case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
        }
      }
    

    所以是的,您当然可以选择不使用任何其他库,例如 Confluent 的 Avro 序列化器/反序列化器(目前作为 confluentinc/schema-registry 的一部分提供)或 Twitter's Bijection .是否值得付出额外的努力由您来决定。

    关于apache-kafka - 在没有 Confluent 组件的情况下从 Kafka 生成和使用 Avro 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37290303/

    相关文章:

    java - Kafka Stream Application删除IDE中state目录失败

    apache-kafka - Kafka 在某些节点上分区不同步

    java - 如何知道java kafka应用程序客户端中是否达到了max.poll.interval.ms?

    java - Kafka 0.11 中 sendOffsetsToTransaction 的含义

    apache-kafka - 当 Kafka Broker 关闭并恢复时,kafka 生产者中的数据丢失

    apache-kafka - 在 JAAS 或 Kafka 配置(不是 Kerberos)中没有定义 serviceName

    authorization - 为什么kafka不像activemq那样支持简单的用户名/密码授权?

    java - 什么数据格式被认为在 Kafka 上写入速度最快?

    java - Kafka 2消费者工厂监听器没有持续连接

    apache-kafka - 强制 kafka 通过 IP 连接代理,而不是通过主机名