amazon-s3 - 如何从 s3 接收器连接器中的信封类型架构中提取嵌套字段

标签 amazon-s3 apache-kafka apache-kafka-connect confluent-schema-registry s3-kafka-connector

Avro 架构:

{
  "type": "record",
  "name": "Envelope",
  "namespace": "test",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
            },
            {
              "name": "createdAt",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.version": 1,
                  "connect.name": "io.debezium.time.ZonedTimestamp"
                }
              ],
              "default": null
            },
           
          ],
          "connect.name": "test.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    }
   
  ],
  "connect.name": "test.Envelope"
}

S3-sink 连接器配置:

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "behavior.on.null.values": "ignore",
  "s3.region": "us-west-2",
  "flush.size": "1",
  "tasks.max": "3",
  "timezone": "UTC",
  "locale": "US",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "aws.access.key.id": "---",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "s3.bucket.name": "test-s3-sink-created-at-partition",
  "partition.duration.ms": "1000",
  "topics.regex": "test_topic",
  "aws.secret.access.key": "---",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "value.converter.schemas.enable": "false",
  "name": "s3-sink-created-at-partition",
  "errors.tolerance": "all",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "path.format": "YYYY/MM/dd",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "createdAt"
}

错误:

    org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: The field 'createdAt' does not exist in
   Caused by: org.apache.kafka.connect.errors.DataException: Unable to find nested field 'createdAt'

面临的问题:

目前,我正在尝试使用上面的接收器连接器使用 s3 存储桶中的 createAt 字段从测试主题获取数据,但它不断抛出createdAt 字段的错误。 并且 s3-bucket 不是使用上述配置创建的。 请对此提出您的建议。

最佳答案

您应该能够使用after.Value.createdAt - See my PR

但更好的选择是 unwrap the envelope ,就像你问的那样。

s3-bucket is not created using above configuration

您需要提前创建存储桶。

关于amazon-s3 - 如何从 s3 接收器连接器中的信封类型架构中提取嵌套字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72786136/

相关文章:

apache-kafka - 如何根据连接器名称获取Kafka源连接器架构

mysql - Kafka 连接 MySQL

java - AmazonS3 putObject 与 InputStream 长度示例

elasticsearch - 用kafka接收器在elasticsearch中重命名索引

java - 如何以非root用户身份运行Kafka?

apache-kafka - 如果消费者组成员宕机,Kafka 消息会发生什么?

java - Kafka Connect SourceTask 的轮询间隔

amazon-web-services - 如何使用自定义资源通过 CloudFormation 模板将内容上传到 S3 存储桶?

amazon-s3 - 如何使用aws cli更改对象的内容类型?

amazon-web-services - 使用 AWS glue 合并多个 Parquet 文件并在 s3 中创建更大的 Parquet 文件