apache-kafka - Kafka Connect 反序列化字节数组

标签 apache-kafka avro apache-kafka-connect

我正在尝试借助 Kafka 连接接收字节数组序列化 Avro 消息。 用于序列化 avro 数据的生产者配置

key.serializer-org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer-org.apache.kafka.common.serialization.ByteArraySerializer

hdfs接收器配置

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=csvtopic
hdfs.url=hdfs://10.15.167.119:8020
flush.size=3
locale=en-us
timezone=UTC
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter.schema.registry.url=http://localhost:8081
hive.metastore.uris=thrift://10.15.167.119:9083
hive.integration=true
schema.compatibility=BACKWARD

如果我从 hdfs Quickstart-hdfs.properties 中删除 hive 集成和 format.class,我就能够将数据保存到 HDFS 中。 启用配置单元集成后,我收到以下异常堆栈跟踪

java.lang.RuntimeException: org.apache.kafka.connect.errors.SchemaProjectorException: Schema version required for BACKWARD compatibility
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:401)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

如何反序列化从Kafka主题接收到的字节流并将其保存在hive中?

最佳答案

如果您将 Avro 与架构注册表一起用于消息,则应该使用 AvroConverter 而不是 ByteArrayConverter,即:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

关于apache-kafka - Kafka Connect 反序列化字节数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49554659/

相关文章:

java - 为什么 Apache Kafka 消费者不使用 Log4j2 根记录器?

java - Storm UI 未显示正确确认

apache-kafka - Kafka 会改变二进制数据吗?

amazon-s3 - 将 Kafka 中的 Avro 转换为 Parquet 直接转入 S3

mongodb - 当使用 debezium 从 mongoDB 读取时,KafkaConnect 生成具有空值的 CDC 事件

java - 重启 Storm 时再次处理来自 Kafka 的所有预处理记录

asynchronous - 具有异步 goroutines 的 Kafka 消费者

python - 在 python 中使用 snappy 和 avro 时出现问题

java - 如何在Spark 1.3.1中使用Java读取AVRO数据?

java - Kafka连接confluent elasticsearch sink(找不到类报错)