java - 使用汇合反序列化Apache Flink中的Avro

标签 java apache-kafka apache-flink avro confluent-schema-registry

我正在尝试在Apache Flink上反序列化来自Kafka的Avro消息

我目前正在通过实现DeserializationSchema接口来做到这一点,但它已经废止了,有没有更好的形式来实现这一目标?

谢谢。

这些是我的课

public final class FlinkKafkaExample {

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // setup Kafka sink
    ConfluentAvroDeserializationSchema deserSchema = new
            ConfluentAvroDeserializationSchema("http://localhost:8081", 1000);

    Properties kafkaProps = new Properties();
    kafkaProps.setProperty("bootstrap.servers", "0.0.0.0:9092");
    kafkaProps.setProperty("zookeeper.connect", "0.0.0.0:2181");
    kafkaProps.setProperty("group.id", "org.apache.flink");
    FlinkKafkaConsumer08<String> flinkKafkaConsumer =
            new FlinkKafkaConsumer08<String>("conekta.public.codes", deserSchema, kafkaProps);

    DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);


    kafkaStream.print();
    env.execute("Flink Kafka Java Example");
}


}

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<String> {

private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;

public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int identityMapCapacity) {
    this.schemaRegistryUrl = schemaRegistryUrl;
    this.identityMapCapacity = identityMapCapacity;
}


public String deserialize(byte[] message) {
    if (kafkaAvroDecoder == null) {
        SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
        this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
    }
    return this.kafkaAvroDecoder.fromBytes(message).toString();
}


public boolean isEndOfStream(String nextElement) {
    return false;
}
public TypeInformation<String> getProducedType() {
    return BasicTypeInfo.STRING_TYPE_INFO;
}


}

最佳答案

关于java - 使用汇合反序列化Apache Flink中的Avro,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59885325/

相关文章:

java - Flink 从 cassandra 获取数据用作数据集时出现问题

java - 一个简单的 Java SOAP 客户端

hdfs - 如何在 Apache Flink 中使用 BucketingSink 写入 ORC 文件?

java - 用于将 JSON 数据合成为 HTML 的合适 Web 框架

ruby - Ruby Kafka未捕获的异常:未能找到组协调器

parallel-processing - Apache 弗林克 : How to execute in parallel but keep order of messages?

java - 事务性生产者与幂等生产者 Java(异常 OutOfOrderSequenceException)

logging - flink 中的自定义 log4j 属性

java - 处理 Mono Inside Flux 平面图

java - 如何在 Java 中向 Pair Hashmap 添加预定义值