java - 设置 STRUCT 类型的默认值时 Kafka Connect API 错误

标签 java apache-kafka apache-kafka-connect

我想设置一个默认值STRUCT,代码如下:

        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("homeAddress")
                .field("province", SchemaBuilder.STRING_SCHEMA)
                .field("city", SchemaBuilder.STRING_SCHEMA);
        Struct defaultValue = new Struct(schemaBuilder.build())
                .put("province", "aaaa")
                .put("city", "bbbb");
        Schema dataSchema = SchemaBuilder.struct().name("personMessage")
                .field("address", schemaBuilder.defaultValue(defaultValue).build()).build();
        Struct struct = new Struct(dataSchema);

但是我得到了如下错误

Exception in thread "main" org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
    ... 1 more

我挖掘了 ConnectSchema.validateValue 的代码并找到了抛出异常的原因,

value的schema类型是ConnectSchema,但另一个是SchemaBuilder,则抛出异常。

case STRUCT:
    Struct struct = (Struct) value;
    if (!struct.schema().equals(schema))
        throw new DataException("Struct schemas do not match.");
    struct.validate();

等于方法

        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ConnectSchema schema = (ConnectSchema) o;
        return Objects.equals(optional, schema.optional) &&
                Objects.equals(version, schema.version) &&
                Objects.equals(name, schema.name) &&
                Objects.equals(doc, schema.doc) &&
                Objects.equals(type, schema.type) &&
                Objects.deepEquals(defaultValue, schema.defaultValue) &&
                Objects.equals(fields, schema.fields) &&
                Objects.equals(keySchema, schema.keySchema) &&
                Objects.equals(valueSchema, schema.valueSchema) &&
                Objects.equals(parameters, schema.parameters);

任何人都可以帮助如何设置类型STRUCT的默认值

下面是方法“defaultValue”的代码:

public SchemaBuilder defaultValue(Object value) {
        checkCanSet(DEFAULT_FIELD, defaultValue, value);
        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
        try {
            ConnectSchema.validateValue(this, value);
        } catch (DataException e) {
            throw new SchemaBuilderException("Invalid default value", e);
        }
        defaultValue = value;
        return this;
    }

如果我将 ConnectSchema.validateValue(this, value) 更改为 ConnectSchema.validateValue(this.builder(), value) 似乎就可以了,我不知道其他情况是否可以。

谢谢。

最佳答案

改变这个 Struct defaultValue = new Struct(schemaBuilder.build())
Struct defaultValue = new Struct(schemaBuilder)

ConnectSchemaSchemaBuilder 两个类都实现 Schema schemaBuilder.schemaBuilder.build() 将所有值传递给 ConnectSchema 构造函数并返回新的 Schema 对象,并且该对象是不可变的。

看看这个。

System.out.println(struct); //empty 
System.out.println(struct.get("address"));  //default values
System.out.println(  ((Struct)struct.get("address")).getString("city")   ); //default values
System.out.println(  ((Struct)struct.get("address")).getString("province")   ); //default values

默认值不会重写到结构字段。它们仅存在于架构定义中,如果字段为空,则从架构定义中返回默认值。

来自 Struct.class

  public Object get(Field field) {
        Object val = this.values[field.index()];
        if (val == null && field.schema().defaultValue() != null) {
            val = field.schema().defaultValue();
        }

        return val;
    }

关于java - 设置 STRUCT 类型的默认值时 Kafka Connect API 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59639356/

相关文章:

java - 使用 java 配置的不存在请求映射的拦截器

java - 关于java片段输出的问题

apache-kafka - Kafka节点之间如何通信?

java - 在反序列化过程中如何在不使用无限循环的情况下编写 kafka 消费者?

mysql - Debezium kafka 连接器未成功更新

apache-kafka - Kafka-MongoDB Debezium 连接器 : distributed mode

java - 需要将数据分为几组,并且每组都不应超过指定数量的重复项

Java文件输入编译后不起作用

spring - 重新平衡后,kafka 停止使用来自新分配分区的消息

spring-boot - Apache Kafka 与 Springboot 连接