apache-kafka - Spring Embedded Kafka + Mock Schema Registry : State Store ChangeLog Schema not registered

标签 apache-kafka apache-kafka-streams confluent-schema-registry embedded-kafka

我正在使用 Spring Embedded Kafka Broker 为我们的 kafka 系统构建集成测试,带有 MockSchemaRegistryClient。我正在为使用 Streams API (KStreamBuilder) 构建的其中一个 Stream 拓扑构建测试。这个特定的拓扑有一个 KStream (stream1) 馈入 KTable (table1)。

当我将输入输入到 stream1 时遇到错误,该错误源自 table1 的 KTableProcessor:

Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
**Caused by: java.io.IOException: Cannot get schema from schema registry!**
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
    at com.sofi.kafka.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:35)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:151)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:135)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:62)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)

The KTableProcessor is attempting to deserialize an entry from the RocksDB state store, however the schema does not exist in the mock schema registry. The topic whose schema is being requested is: **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog**

As the exception states, the schema has not been registered. However, the topic **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog-key** does have a registered schema (registered when the entry's key is serialized for the query). 

Since this is an internal topic, I don't expect to have to register this schema myself, however I'm failing because of the schema's absence in the registry. Is there a way to have changelog schemas registered prior to data ingestion? Is there a way to disable state store changelogging with the KStreamBuilder?

提前致谢!

最佳答案

解决了这个问题,我现在要害羞地叙述:当使用带有嵌入式 kafka 代理的 KTable(通过 Streams API)时,您需要使用一个 State Store 目录配置 KafkaStreams 对象,该目录对于每次运行的嵌入式 kafka 代理都是唯一的(在我的情况下,每次运行测试)。

您可以通过 StreamsConfig.STATE_DIR_CONFIG 控制 State Store 目录。配置。我通过将时间戳附加到默认状态存储目录来使其独一无二

properties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kraken-streams/" + LocalDateTime.now().toString());

问题是每次初始化嵌入式 kafka 代理时,同一位置都存在旧状态存储。当第一条记录进入 KTable 的主题时,状态存储能够返回先前的值。这导致尝试反序列化尚未(就模式注册表实例而言)序列化的状态存储记录。模式仅在序列化时注册,因此由于缺少已注册的模式,反序列化尝试失败。

关于apache-kafka - Spring Embedded Kafka + Mock Schema Registry : State Store ChangeLog Schema not registered,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50917589/

相关文章:

apache-kafka - Kafka拓扑设计: How to do sliding window join and emit events on timeout?【难】

go - 通过golang将消息以avro格式推送到kafka

docker - 用于Confluent的Docker镜像-添加Confluent Hub连接器

apache-kafka-streams - Kafka Ktable 查询

apache-spark - 如何计算一天从 Kafka 主题中获取的消息数?

java - 如何使用标点符号从状态存储中删除旧记录? (卡夫卡)

scala - 从数据帧制作 avro 模式 - spark - scala

apache-kafka - kafka + 如何计算 log.retention.byte 的值

json - Kafka - Json(最佳实践)