mongodb - Debezium MongoDB 连接器错误 : org. apache.kafka.connect.errors.ConnectException:错误处理程序中超出了容差

标签 mongodb apache-kafka apache-kafka-connect confluent-schema-registry debezium

我正在尝试使用 Transforms 为 MongoDB 部署新的 Debezium 连接器。配置如下所示:

{"name": "mongo_source_connector_autostate",
    "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "tasks.max":1,
    "initial.sync.max.threads":4,
    "mongodb.hosts": "rs0/FE0VMC1980:27017", 
    "mongodb.name": "mongo", 
    "collection.whitelist": "DASMongoDB.*_AutoState",
    "transforms": "unwrap",
    "transforms.unwrap.type" : "io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
    "transforms.sanitize.field.names" : true
    }}

但是连接器失败并出现以下错误:

 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.avro.SchemaParseException: Illegal initial character: 10019_AutoState
        at org.apache.avro.Schema.validateName(Schema.java:1528)
        at org.apache.avro.Schema.access$400(Schema.java:87)
        at org.apache.avro.Schema$Name.<init>(Schema.java:675)
        at org.apache.avro.Schema.createRecord(Schema.java:212)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:893)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:732)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:726)
        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 11 more

我已使用以下配置以分布式模式启动连接器:

...
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
...

注意:我有另一个没有任何转换的连接器。它运行得很好。

我想获得有关此问题的一些帮助。提前致谢。

最佳答案

您的一个字段似乎违反了 Avro 命名规则。在你的情况下,它似乎是这个:

The name portion of a fullname, record field names, and enum symbols must:

  • start with [A-Za-z_]

但是 10019_AutoState 违反了规则,因为它以数值开头。您可以将其更改为类似 AutoState10019


您可以查看包含所有记录字段命名约束的完整列表 here

关于mongodb - Debezium MongoDB 连接器错误 : org. apache.kafka.connect.errors.ConnectException:错误处理程序中超出了容差,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61501681/

相关文章:

javascript - 更新 mongodb 集合中的字段,即使它不存在

ruby - 使用 mongomapper 在 Array 字段中查找包含元素的文档?

scala - 使用Spark结构化流处理包含嵌套实体的JSON

java - Kafka Stream with Avro in JAVA , schema.registry.url"没有默认值

rest - 无法为 Kafka Connect REST API 配置 SSL

docker - 使用本地 kafka-connect 集群连接远程数据库的连接超时

apache-kafka - debezium 生成事件的 Kafka Connect 日期处理

javascript - 无法返回值以响应 mongoose/mongodb 和 nodejs

node.js - Mongoose 在 _doc 对象中返回数据

apache-kafka - Kafka Topic Partitioner功能——配置问题