java - Confluence JDBC 连接器和 Flink 消费者

标签 java apache-flink avro confluent-platform confluent-schema-registry

我们正在尝试将 SQL-Server JDBC Connector 与 KafkaAvroSerializer 结合使用,并提供定制的 ProducerInterceptor 在将数据发送到 Kafka 之前对其进行加密。

在消费者方面,我们希望使用 Flink 连接器进行解密,然后使用适当的反序列化器。

为了实现这一目标,我们有几个问题:

1)如果我们提供自定义ConsumerInterceptor来解密数据,那么我们在Flink中创建DataStream时是否应该通过Properties文件传入?

Properties properties = new Properties();
        ...
    `properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
    ...

    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));

以上配置是否正确,或者我是否需要设置任何其他属性,以便将 ConsumerInterceptor 传递给 Flink?

2)另一个问题是关于Flink中的Deserializer。我在网上查了一下,发现了一些代码片段,如下所示:

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> { 

    private final String schemaRegistryUrl; 
    private final int identityMapCapacity; 
    private KafkaAvroDecoder kafkaAvroDecoder; 

    public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) { 
        this(schemaRegistyUrl, 1000); 
    }

因此,如果我们使用 JDBC 连接器将数据传递到 Kafka,而不进行任何修改(除了加密数据之外),那么我们应该在反序列化期间提供什么数据类型?我们将在反序列化之前解密数据。

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> { 

提前致谢,

最佳答案

只需添加最终结果,以便它可以帮助任何正在寻找相同结果的人:

public class ConfluentAvroDeserializationSchema
            implements DeserializationSchema<GenericRecord> {

        private final String schemaRegistryUrl;
        private final int identityMapCapacity;
        private transient KafkaAvroDecoder kafkaAvroDecoder;


        public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
            this(schemaRegistyUrl, 1000);
        }

        public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
                identityMapCapacity) {
            this.schemaRegistryUrl = schemaRegistryUrl;
            this.identityMapCapacity = identityMapCapacity;
        }

        @Override
        public GenericRecord deserialize(byte[] bytes) throws IOException {
            if (kafkaAvroDecoder == null) {
                SchemaRegistryClient schemaRegistry = new
                        CachedSchemaRegistryClient(this.schemaRegistryUrl,
                        this.identityMapCapacity);
                this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
            }
            return (GenericRecord) this.kafkaAvroDecoder.fromBytes(bytes);
        }

        @Override
        public boolean isEndOfStream(GenericRecord string) {
            return false;
        }

        @Override
        public TypeInformation<GenericRecord> getProducedType() {
            return TypeExtractor.getForClass(GenericRecord.class);
        }
    }

关于java - Confluence JDBC 连接器和 Flink 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49948201/

相关文章:

apache-flink - KeyedStream 中的 max 和 maxBy 有什么区别

hadoop - 如何在 Flink 独立集群上的 Flink 作业中使用两个 Kerberos key 表(用于 Kafka 和 Hadoop HDFS)?

java - Flink State过期时触发

hadoop - AVRO 文件上的 Hive 外部表仅为所有列生成 NULL 数据

scala - 如何使用 Spark Streaming 从 Kafka 读取二进制序列化的 Avro(Confluent Platform)

java - 第二个 Activity 的 TextView 内的文本超出了屏幕,但适用于更大的显示屏手机

java - 如何在 Mac OS Mountain Lion 上的 Java 1.7 中安装 JCE?

scala - 如何将Avro的GenericData.Record的RDD转换为DataFrame?

java - 需要收集控制台应用程序输出并存储它

Java EE session : Can you store some attributes as non-persistent?