java - Avro 和 Kafka 通过使用 SchemaBuilder

标签 java apache-kafka avro

我完成了来自 baeldung 的教程.他们提到有两种创建模式的方法。

  • 通过编写 json 表示并添加 maven 插件来生成类
  • 通过使用 SchemaBuilder,他们也提到这是更好的选择。

不幸的是,在 git 示例中我只看到了 json 方式。

假设我有这个 Avro 模式:

{
  "type":"record",
  "name":"TestFile",
  "namespace":"com.example.kafka.data.ingestion.model",
  "fields":[
    {
      "name":"date",
      "type":"long"
    },
    {
      "name":"counter",
      "type":"int"
    },
    {
      "name":"mc",
      "type":"string"
    }
  ]
}

通过在我的 pom 文件中添加这个插件:

<plugin>
   <groupId>org.apache.avro</groupId>
   <artifactId>avro-maven-plugin</artifactId>
   <version>1.8.0</version>
   <executions>
      <execution>
         <id>schemas</id>
         <phase>generate-sources</phase>
         <goals>
            <goal>schema</goal>
            <goal>protocol</goal>
            <goal>idl-protocol</goal>
         </goals>
         <configuration>
            <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
         </configuration>
      </execution>
   </executions>
</plugin>

并使用 generate-sources 构建一个 TestFile.java 到我说的目的地。 然后为了发送到 kafka 主题,我可以执行以下操作:

TestFile test = TestFile.newBuilder()
                                            .setDate(102928374747)
                                            .setCounter(2)
                                            .setMc("Some string")
                                            .build();
kafkaTemplate.send(topicName, test);

SchemaBuilder 创建模式的等价物是:

Schema testFileSchema = SchemaBuilder   .record("TestFile")
                                            .namespace("com.example.kafka.data.ingestion.model")
                                            .fields()
                                            .requiredLong("date")
                                            .requiredInt("counter")
                                            .requiredString("mc")
                                            .endRecord();

但是我现在如何生成 POJO 并将我的 TestFile 数据发送到我的 kafka 主题?

最佳答案

您将无权访问 TestFile对象,因为架构是在运行时创建的,而不是预编译的。如果你想保留那个 POJO,那么你需要一个 public TestFile(GenericRecord avroRecord) 的构造函数

您需要创建一个 GenericRecord使用那个Schema对象,就像您从字符串或文件中解析它一样。

例如,

Schema schema = SchemaBuilder.record("TestFile")
            .namespace("com.example.kafka.data.ingestion.model")
            .fields()
            .requiredLong("date")
            .requiredInt("counter")
            .requiredString("mc")
            .endRecord();

GenericRecord entry1 = new GenericData.Record(schema);
entry1.put("date", 1L);
entry1.put("counter", 2);
entry1.put("mc", "3");

// producer.send(new ProducerRecord<>(topic, entry1);

一个完整的 Kafka 例子是 available from Confluent

如果你不包含必填字段,它会抛出一个错误,并且不会检查类型的值(我可以放 "counter", "2" ,它会发送一个字符串值(这似乎是对我来说是一个错误。基本上,GenericRecord == HashMap<String, Object> 具有必需/可空字段的额外好处。

并且您将需要配置一个 Avro 序列化程序,例如 Confluent 的序列化程序,这需要运行其 Schema Registry 或类似 Cloudera shows 的版本

否则,您需要将 Avro 对象转换为 byte[] (如您的链接所示,只需使用 ByteArraySerializer

关于java - Avro 和 Kafka 通过使用 SchemaBuilder,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54196221/

相关文章:

java - 当执行在 xml 中定义的测试套件时,是否可以假设套件中的所有测试都在同一个 JVM 中执行?

java - 如何让 Avro 架构验证支持字段别名?

java - 具有解码器问题的 Kafka Avro 消费者

java - 解析嵌套 avro 文件

java - 一个支持集群的 IoC 框架 - 您认为它应该做什么?

java - CardLayout 无法正常工作

java - 如何将 JDateChooser 的日期设置为 NULL 或 "0000-00-00"

java - 在 Kafka Stream 中处理消息时发生错误时重新处理消息

apache-spark - 开发人员之间是否可以共享/访问hdfs?

java - Kafka - 反序列化消费者中的对象