java - 如何使用 Avro (schemaRegistry) 对 Kafka Streams 进行功能测试?

标签 java apache-kafka avro apache-kafka-streams

  • 简要说明我想要实现的目标: 我想对 avro 记录的 kafka 流拓扑(使用 TopologyTestDriver)进行功能测试。

  • 问题:无法“模拟”schemaRegistry 以自动执行模式发布/读取

到目前为止我尝试的是使用 MockSchemaRegistryClient 来尝试模拟 schemaRegistry,但我不知道如何将它链接到 Avro Serde。

代码

public class SyncronizerIntegrationTest {


    private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());

    MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();


    @Test
    void integrationTest() throws IOException, RestClientException {


        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
        props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
        StreamsBuilder kStreamBuilder = new StreamsBuilder();
        Serde<Tracking> avroSerde = getAvroSerde();
        mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());


        KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
                "topic",
                Consumed.with(Serdes.String(), avroSerde));

        unmappedOrdersStream
                .filter((k, v) -> v != null).to("ouput");

        Topology topology = kStreamBuilder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));

    }
}

AvroSerde 方法

private <T extends SpecificRecord> Serde<T> getAvroSerde() {

    // Configure Avro ser/des
    final Map<String,String> avroSerdeConfig = new HashMap<>();
    avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

    final Serde<T> avroSerde = new SpecificAvroSerde<>();
    avroSerde.configure(avroSerdeConfig, false); // `false` for record values
    return avroSerde;
}

如果我在没有 testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking())); 的情况下运行测试,它运行良好(看起来一切都已正确解决)

但是

当我尝试插入数据(pipeInput)时,它抛出以下异常:对象“Tracking”已满。

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)

编辑了,我没有删除这个,因为“历史日志”提供了遵循的路径。

最佳答案

Confluent 提供了大量用于测试 Kafka (Streams) 以及 Schema Registry 的示例代码。

https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

最重要的是,模拟并不是一个完整的集成测试——启动一个带有内存模式注册表的实际 Kafka 代理才是。

在上面的代码中,参见

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());

关于java - 如何使用 Avro (schemaRegistry) 对 Kafka Streams 进行功能测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52737242/

相关文章:

java - 在 Spring 实现授权码授予时出现的问题

apache-kafka - 使用 Apache Storm 获取数据

apache-kafka - 如何清除消息中心主题?

java - java中Avro不可变生成的类

hadoop - 有没有办法使用Hive进行访问存储在HBase中的Avro数据的分析

java - 如何以编程方式在你的安卓应用程序中打开谷歌教室

java - Android:如何从 fragment 中的主动行为中获取媒体播放器对象?

java - Spring Boot Logback 记录 DEBUG 消息

apache-kafka - 是否有 Apache Kafka 消费者延迟检查器?

java - 使用来自不同包的 Avro 模式