Avro 架构:
{
"type": "record",
"name": "Envelope",
"namespace": "test",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
],
"connect.name": "test.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}
],
"connect.name": "test.Envelope"
}
S3-sink 连接器配置:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "us-west-2",
"flush.size": "1",
"tasks.max": "3",
"timezone": "UTC",
"locale": "US",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"aws.access.key.id": "---",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"s3.bucket.name": "test-s3-sink-created-at-partition",
"partition.duration.ms": "1000",
"topics.regex": "test_topic",
"aws.secret.access.key": "---",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"value.converter.schemas.enable": "false",
"name": "s3-sink-created-at-partition",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "YYYY/MM/dd",
"timestamp.extractor": "RecordField",
"timestamp.field": "createdAt"
}
错误:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: The field 'createdAt' does not exist in
Caused by: org.apache.kafka.connect.errors.DataException: Unable to find nested field 'createdAt'
面临的问题:
目前,我正在尝试使用上面的接收器连接器使用 s3 存储桶中的 createAt 字段从测试主题获取数据,但它不断抛出createdAt 字段的错误。 并且 s3-bucket 不是使用上述配置创建的。 请对此提出您的建议。
最佳答案
您应该能够使用after.Value.createdAt
- See my PR
但更好的选择是 unwrap the envelope ,就像你问的那样。
s3-bucket is not created using above configuration
您需要提前创建存储桶。
关于amazon-s3 - 如何从 s3 接收器连接器中的信封类型架构中提取嵌套字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72786136/