java - Kafka Connect 无法将自定义存储接收器分区器转换为 Partitioner 接口(interface)

标签 java kotlin apache-kafka apache-kafka-connect

我需要为 kafka 连接创建一个自定义分区器 S3 sink plugin 。 我已经延长了 HourlyPartitioner在使用 kotlin 的自定义类中:

class RawDumpHourlyPartitioner<T> : HourlyPartitioner<T>() {
...
}

并相应地更改了我的连接器配置以使用自定义类:

"partitioner.class": "co.myapp.RawDumpHourlyPartitioner",

然后,我创建了 jar(我们使用 shadow )并将其包含到基于 kafka 连接镜像的自定义 docker 镜像中(镜像版本与我们在项目中使用的依赖项相同):

FROM gradle:6.0-jdk8 as builder
WORKDIR /app
ADD . .
RUN gradle clean shadowJar

FROM confluentinc/cp-kafka-connect:5.3.2

COPY --from=builder /app/build/libs/kafka-processor-0.1-all.jar /usr/share/java/kafka/kafka-processor.jar

当连接器启动时,我收到此错误:

ERROR WorkerSinkTask{id=staging-raw-dump-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.ClassCastException: co.myapp.RawDumpHourlyPartitioner cannot be cast to io.confluent.connect.storage.partitioner.Partitioner

为了仔细检查我已经创建了一个尝试实例化该类的 java 文件,并且它没有抛出任何错误:

import io.confluent.connect.storage.partitioner.Partitioner;

public class InstantiateTest {
    public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class<? extends Partitioner<?>> partitionerClass =
                (Class<? extends Partitioner<?>>) Class.forName("co.myapp.RawDumpHourlyPartitioner");

        Partitioner<?> partitioner = partitionerClass.newInstance();
    }
}

最佳答案

查看kafka连接guide它说:

A Kafka Connect plugin is simply a set of JAR files where Kafka Connect can find an implementation of one or more connectors, transforms, and/or converters. Kafka Connect isolates each plugin from one another so that libraries in one plugin are not affected by the libraries in any other plugins. This is very important when mixing and matching connectors from multiple providers.

这意味着,由于我使用的是 S3 接收器连接器,因此我必须将带有自定义分区程序的 jar 放在 S3 插件的目录中。

将 jar 文件移动到 /usr/share/java/kafka-connect-s3 解决了问题

在评论中,我提到我的 jar 还包含我们在主 kafka-connect 配置(环境变量)中使用的自定义主题名称策略,在这种情况下,jar 需要位于 /usr/share/java/kafka 文件夹

更新:如cricket_007提到最好将自定义分区程序 jar 放入 /usr/share/java/kafka-connect-storage-common 文件夹中,这是所有其他分区程序所在的位置

关于java - Kafka Connect 无法将自定义存储接收器分区器转换为 Partitioner 接口(interface),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60199073/

相关文章:

Java lambda 表达式——映射然后修改列表?

java - 可 ScrollView Eclipse 插件

java - 在 cssSelector webdriver 上转义特殊字符

java - 如何在 java applet 中使用 JMF 功能?

java - Toast 适用于 Java Activity ,但不适用于 kotlin

kotlin - 如何在 Kotlin 协程中使用异步缓存?

kotlin - Kotlin中的Jinq-如何将Lambda转换为Java SerializedLambda?

apache-zookeeper - 为什么 Kafka 消费者连接到 Zookeeper,而生产者从 Broker 获取元数据?

java - spring-kafka 使用很长的任务多次处理相同的消息。

java - Kafka Consumer 收到相同的消息