简要说明我想要实现的目标: 我想对 avro 记录的 kafka 流拓扑(使用 TopologyTestDriver)进行功能测试。
问题:无法“模拟”schemaRegistry 以自动执行模式发布/读取
到目前为止我尝试的是使用 MockSchemaRegistryClient 来尝试模拟 schemaRegistry,但我不知道如何将它链接到 Avro Serde。
代码
public class SyncronizerIntegrationTest {
private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());
MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
@Test
void integrationTest() throws IOException, RestClientException {
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
StreamsBuilder kStreamBuilder = new StreamsBuilder();
Serde<Tracking> avroSerde = getAvroSerde();
mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());
KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
"topic",
Consumed.with(Serdes.String(), avroSerde));
unmappedOrdersStream
.filter((k, v) -> v != null).to("ouput");
Topology topology = kStreamBuilder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));
}
}
AvroSerde 方法
private <T extends SpecificRecord> Serde<T> getAvroSerde() {
// Configure Avro ser/des
final Map<String,String> avroSerdeConfig = new HashMap<>();
avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");
final Serde<T> avroSerde = new SpecificAvroSerde<>();
avroSerde.configure(avroSerdeConfig, false); // `false` for record values
return avroSerde;
}
如果我在没有 testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));
的情况下运行测试,它运行良好(看起来一切都已正确解决)
但是
当我尝试插入数据(pipeInput)时,它抛出以下异常:对象“Tracking”已满。
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)
编辑了,我没有删除这个,因为“历史日志”提供了遵循的路径。
最佳答案
Confluent 提供了大量用于测试 Kafka (Streams) 以及 Schema Registry 的示例代码。
最重要的是,模拟并不是一个完整的集成测试——启动一个带有内存模式注册表的实际 Kafka 代理才是。
在上面的代码中,参见
@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
和
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());
关于java - 如何使用 Avro (schemaRegistry) 对 Kafka Streams 进行功能测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52737242/