java - Kafka JDBC 连接器中的自定义分区分配

标签 java apache-kafka apache-kafka-connect

我有一个用例,我需要编写自定义逻辑以根据消息中的某些关键参数分配分区。我对此做了一些研究,发现 kafka 转换支持覆盖 Transformation 接口(interface)中的一些方法,但我无法在 git hub 或其他地方做一些示例代码。有人可以分享示例代码或 git hub 链接以在 kafka JDBC 源连接器中执行自定义分区分配吗?

提前致谢!

最佳答案

Kafka Connect 默认使用分配分区:DefaultPartitioner (org.apache.kafka.clients.producer.internals.DefaultPartitioner)

如果您需要用一些自定义覆盖默认连接器,这是可能的,但您必须记住,覆盖适用于所有源连接器。 为此,您必须设置 producer.partitioner.class 属性,例如 producer.partitioner.class=com.example.CustomPartitioner。 此外,您必须使用分区程序将 jar 复制到包含 Kafka Connect 库的目录。

转化方式:

在 Transformation 中也可以设置分区,但这不是正确的做法。 在 Transformation 中,您无权访问主题元数据,这对于分配分区至关重要。

如果无论如何你想为你的记录设置分区,代码应该是这样的:

public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {

    public static final ConfigDef CONFIG_DEF = new ConfigDef();

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    }

    @Override
    public R apply(R record) {
        return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
    }

    private Integer calculatePartition(R record) {
        // Partitions calcuation based on record information
        return 0;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}

关于java - Kafka JDBC 连接器中的自定义分区分配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55188508/

相关文章:

鼠标移动时忽略 Java 鼠标事件?

java - 局部变量是多余的

ssl - 在 Dataflow 上运行的 Apache Beam 管道无法从 KafkaIO : SSL handshake failed 读取

oracle - Kafka SMT ValueToKey - 如何使用多个值作为键?

mongodb - kafka 连接器 debezium mongodb CDC 更新/$set 消息没有过滤器(_id 值)

java - 如何在 spring security 中重定向访问被拒绝的页面?

java - 在java中添加两个char数据类型

apache-kafka - Kafka Consumer poll() 方法被阻塞

java - 谁负责偏移维护?

java - Kafka Streams 表转换