avro - 如何使用 Spring-Kafka 通过 Confluence Schema 注册表读取 AVRO 消息?

标签 avro spring-kafka confluent-platform confluent-schema-registry

如何使用 Spring-Kafka 通过 Confluence Schema 注册表读取 AVRO 消息?有 sample 吗?我在官方引用文档中找不到它。

最佳答案

下面的代码可以读取 customer-avro 主题的消息。这是我定义的 AVRO 值架构。

{
     "type": "record",
     "namespace": "com.example",
     "name": "Customer",
     "version": "1",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of Customer" },
       { "name": "last_name", "type": "string", "doc": "Last Name of Customer" },
       { "name": "age", "type": "int", "doc": "Age at the time of registration" },
       { "name": "height", "type": "float", "doc": "Height at the time of registration in cm" },
       { "name": "weight", "type": "float", "doc": "Weight at the time of registration in kg" },
       { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }
     ]
}

下面是一个完整的代码片段,用于通过手动提交读取此示例。

import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Calendar;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroJavaConsumerV1Demo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // normal consumer
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.put("group.id", "customer-consumer-group-v1");
        properties.put("auto.commit.enable", "false");
        properties.put("auto.offset.reset", "earliest");

        // avro part (deserializer)
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
        properties.setProperty("specific.avro.reader", "true");

        KafkaConsumer<String, Customer> kafkaConsumer = new KafkaConsumer<>(properties);
        String topic = "customer-avro";
        kafkaConsumer.subscribe(Collections.singleton(topic));

        System.out.println("Waiting for data...");

        while (true){
            System.out.println("Polling at " + Calendar.getInstance().getTime().toString());
            ConsumerRecords<String, Customer> records = kafkaConsumer.poll(1000);

            for (ConsumerRecord<String, Customer> record : records){
                Customer customer = record.value();
                System.out.println(customer);
            }

            kafkaConsumer.commitSync();
        }
    }
}

关于avro - 如何使用 Spring-Kafka 通过 Confluence Schema 注册表读取 AVRO 消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51979389/

相关文章:

hive - 如何在配置单元 0.13+ 中为 Parquet 数据指定模式

mysql - 如何在另一台主机上连接 MySQL 和 kafka?

apache-kafka - 在 Kafka 上实现事件溯源架构

sqoop - 如何使用 Sqoop 和 Avro 作为数据文件增量加载数据?

json - 您可以在 Avro JSON 模式文件中添加注释吗?

java - 将 ObjectMapper 注入(inject) Spring Kafka 序列化器/反序列化器

java - Spring kafka错误由: java. lang.ClassNotFoundException : org. springframework.kafka.transaction.KafkaAwareTransactionManager引起

java - 卡夫卡 : How to re-consume un committed/not Acknowledged message

apache-kafka - **Kafka** 跨区域数据中心之间的双向同步

Eclipse:令人困惑的添加到构建路径选项