apache-kafka - Kafka 连接器 - 错误处理程序超出容限

标签 apache-kafka apache-kafka-connect

我在 sql server 上有超过 50 个源连接器,但其中两个出错了,请告诉我可能是什么原因,因为我们对 kafka 服务器的访问受限。

{
    "name": "xxxxxxxxxxxxx",
    "connector": {
        "state": "RUNNING",
        "worker_id": "xxxxxxxxxxxxxx:8083"
    },
    "tasks": [
        {
            "state": "FAILED",
            "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:292)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: Schema required for [updating schema metadata]\n\tat org.apache.kafka.connect.transforms.util.Requirements.requireSchema(Requirements.java:31)\n\tat org.apache.kafka.connect.transforms.SetSchemaMetadata.apply(SetSchemaMetadata.java:64)\n\tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 11 more\n",
            "id": 0,
            "worker_id": "xxxxxxxxxxxxx:8083"
        }
    ],
    "type": "source"
}
源连接器配置:
{
"name": "xxxxxxxx",
"config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.history.kafka.topic": "dbhistory.fullfillment.ecom",
    "transforms": "unwrap,setSchemaName",
    "internal.key.converter.schemas.enable": "false",
    "offset.storage.partitons": "2",
    "include.schema.changes": "false",
    "table.whitelist": "dbo.abc",
    "decimal.handling.mode": "double",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms.setSchemaName.schema.name": "com.data.meta.avro.abc",
    "database.dbname": "xxxxxx",
    "database.user": "xxxxxx",
    "database.history.kafka.bootstrap.servers": "xxxxxxxxxxxx",
    "database.server.name": "xxxxxxx",
    "database.port": "xxxxxx",
    "transforms.setSchemaName.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
    "key.converter.schemas.enable": "false",
    "value.converter.schema.registry.url": "http://xxxxxxxxxx:8081",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.hostname": "xxxxxxx",
    "database.password": "xxxxxxx",
    "internal.value.converter.schemas.enable": "false",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "name": "xxxxxxxxxxx"
}
}

最佳答案

如果您查看 trace 中的堆栈跟踪字段,并替换 \n\t包含换行符和制表符的字符,您将看到:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:292)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Schema required for [updating schema metadata]
    at org.apache.kafka.connect.transforms.util.Requirements.requireSchema(Requirements.java:31)
    at org.apache.kafka.connect.transforms.SetSchemaMetadata.apply(SetSchemaMetadata.java:64)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more
因此,您的错误原因被抛出 SetSchemaMetadata单消息转换:org.apache.kafka.connect.errors.DataException: Schema required for [updating schema metadata]我会检查您的连接器上的配置,隔离那些失败的,并查看单消息转换配置。 This issue可能是相关的。

关于apache-kafka - Kafka 连接器 - 错误处理程序超出容限,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62538379/

相关文章:

json - Elastic Sink 中的 Kafka Connect 序列化错误

compression - Kafka消息编解码器-压缩和解压缩

java - Kafka Confluence 平台 3.3 WARN 无法建立与节点 -1 的连接。经纪人可能不可用

java - Kafka 流消费为 CSV

go - 通过golang将消息以avro格式推送到kafka

jdbc - 时间戳模式下的 Kafka Connect - 如何附加到查询的末尾?

java - 为什么 kafka 流线程会在源主题分区更改时死亡?谁能指出这方面的阅读 Material ?

java - 调试随机 SIGSEGV 崩溃

elasticsearch - 如何从Kafka Connect在Elastic Search中创建多个索引

elasticsearch - java.lang.NoClassDefFoundError:org/apache/http/nio/conn/SchemeIOSessionStrategy