apache-kafka - 即使 json 数据包含架构和有效负载字段,kafka 连接 hdfs 接收器连接器也失败

标签 apache-kafka hdfs apache-kafka-connect

我正在尝试使用 kafka 连接 hdfs 接收器连接器将 json 数据从 kafka 移动到 hdfs。

即使 kafka 中的 json 数据具有架构和有效负载 kafka 连接任务也失败并出现错误

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.

Kafka中的数据:

./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 --from-beginning
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "vikas","company": "BT"}}

使用以下命令提交 HDFS 接收器作业:
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-cluster-15may-308pm", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"}}' http://localhost:8083/connectors

分布式 kafka 连接工作器配置:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

错误信息:

http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status
{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
    "id": 0,
    "worker_id": "127.0.0.1:8083"
}

最佳答案

您使用的是什么版本的 Kafka Connect?在通过堆栈跟踪确定错误来源时了解这一点会有所帮助。

我认为正在发生的事情是您在值中有数据,但没有在键中。由于您同时拥有 key.convertervalue.converter设置为 JsonConverter并与 schemas.enable=true ,期待看到包含 schema 的信封格式和 payload对彼此而言。但是,我猜你的 key 都是 null .

这是一种相反的问题,如 https://issues.apache.org/jira/browse/KAFKA-3832哪里JsonConverter从不生成真 null值。相反,它总是生成包含预期的可选模式 + null 的信封。有效载荷。在这种情况下,从 Kafka 转换到 Connect 的数据 API 不起作用,因为它期望 key 中的信封格式相同。

您可以通过添加 --property print.key=true 来验证这是问题所在。到您的控制台使用者命令。如果是打印出来null关键,问题是 JsonConverter 无法解码它们。

一个简单的解决方法是使用其他一些 Converter对于不关心的键 null值——无论如何,键中没有数据。 Kafka Connect 附带的一个是 org.apache.kafka.connect.storage.StringConverter .

关于apache-kafka - 即使 json 数据包含架构和有效负载字段,kafka 连接 hdfs 接收器连接器也失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43976874/

相关文章:

hadoop - 将 Spark 的输出合并到一个文件中

apache-kafka - 如何在 Kafka Connect Runtime 中获取任务详细信息(ID 和连接器)

apache-kafka - Kafka 连接号码类型字段

amazon-web-services - Kafka Connect 与 AWS Hadoop 实例的托管

docker - 卡夫卡生产者错误 "1 partitions have leader brokers without a matching listener"

java - Kafka设置从主题读取的最大消息数

docker - Kubernetes - 机器上已存在容器镜像

mysql - 将数据插入 HIVE 表时出错

hadoop - 在作业和任务级别如何在Hadoop中处理输出文件?

apache-kafka - 具有高可用性的 kafka 多数据中心