java - 卡夫卡 : Manually producing record triggers exception in consumer

标签 java apache-kafka avro

我有一个消费者从多个主题进行轮询。 到目前为止,我只使用 Java 对这些主题进行记录,一切都运行良好。

我将 confulent 工具与 avro 一起使用。

现在我尝试通过终端手动生成主题。

我创建了一个 avro 生产者,其架构与其他生产者使用的架构相同:

# Produce a record with one field
kafka-avro-console-producer \
  --broker-list 127.0.0.1:9092 --topic order_created-in \
  --property schema.registry.url=http://127.0.0.1:8081 \
  --property value.schema='{"type":"record","name":"test","fields":[{"name":"name","type":"string"},{"name":"APropertie","type":{"type":"array","items":{"type":"record","name":"APropertie","fields":[{"name":"key","type":"string"},{"name":"name","type":"string"},{"name":"date","type":"string"}]}}}]}'

之后,我生成一条遵循指定模式的记录:

{"name": "order_created", "APropertie": [{"key": "1", "name": "testname", "date": "testdate"}]}

记录已正确生成到主题。 但是我的 AvroConsumer 抛出异常:

Polling
Polling
Polling
Polling
Polling
Polling
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition order_created-in-0 at offset 1. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class test specified in writer's schema whilst finding reader's schema for a SpecificRecord.

Process finished with exit code 1

有什么提示吗? 谢谢!

最佳答案

与生产者/消费者的配置有关。

普通生产者有这样的配置:

        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");

Avro 通常添加以下属性:

        // avro part
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
        properties.setProperty("confluent.value.schema.validation", "true");
        properties.setProperty("confluent.key.schema.validation", "true");

这些必须包含在控制台生成器中。

关于java - 卡夫卡 : Manually producing record triggers exception in consumer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61972409/

相关文章:

java - Caliper:如何运行多个基准测试?

java - AS400 角色扮演游戏模拟器

java - 媒体播放器示例应用程序崩溃

python-3.x - 使用 Apache Beam Python 为每个窗口编写唯一的 Parquet 文件

apache-kafka - 在不提交来自 Kafka 10 消费者的情况下使用消息

hadoop - Parquet API 没有Keys 的概念?

java - Android:如何使用 ffmpeg 叠加视频和音频?

java - IBM CDC - Kafka 数据复制(LiveAudit 上的定制)

java - 使用自定义序列化器从 avro 读取时,RDD 中的运行时类型错误

maven - 尝试将数据流式传输到 Kafka 时出现 "Error registering Avro schema"