我需要为 kafka 连接创建一个自定义分区器 S3 sink plugin 。 我已经延长了 HourlyPartitioner在使用 kotlin 的自定义类中:
class RawDumpHourlyPartitioner<T> : HourlyPartitioner<T>() {
...
}
并相应地更改了我的连接器配置以使用自定义类:
"partitioner.class": "co.myapp.RawDumpHourlyPartitioner",
然后,我创建了 jar(我们使用 shadow )并将其包含到基于 kafka 连接镜像的自定义 docker 镜像中(镜像版本与我们在项目中使用的依赖项相同):
FROM gradle:6.0-jdk8 as builder
WORKDIR /app
ADD . .
RUN gradle clean shadowJar
FROM confluentinc/cp-kafka-connect:5.3.2
COPY --from=builder /app/build/libs/kafka-processor-0.1-all.jar /usr/share/java/kafka/kafka-processor.jar
当连接器启动时,我收到此错误:
ERROR WorkerSinkTask{id=staging-raw-dump-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.ClassCastException: co.myapp.RawDumpHourlyPartitioner cannot be cast to io.confluent.connect.storage.partitioner.Partitioner
为了仔细检查我已经创建了一个尝试实例化该类的 java 文件,并且它没有抛出任何错误:
import io.confluent.connect.storage.partitioner.Partitioner;
public class InstantiateTest {
public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
Class<? extends Partitioner<?>> partitionerClass =
(Class<? extends Partitioner<?>>) Class.forName("co.myapp.RawDumpHourlyPartitioner");
Partitioner<?> partitioner = partitionerClass.newInstance();
}
}
最佳答案
查看kafka连接guide它说:
A Kafka Connect plugin is simply a set of JAR files where Kafka Connect can find an implementation of one or more connectors, transforms, and/or converters. Kafka Connect isolates each plugin from one another so that libraries in one plugin are not affected by the libraries in any other plugins. This is very important when mixing and matching connectors from multiple providers.
这意味着,由于我使用的是 S3 接收器连接器,因此我必须将带有自定义分区程序的 jar 放在 S3 插件的目录中。
将 jar 文件移动到 /usr/share/java/kafka-connect-s3
解决了问题
在评论中,我提到我的 jar 还包含我们在主 kafka-connect 配置(环境变量)中使用的自定义主题名称策略,在这种情况下,jar 需要位于 /usr/share/java/kafka
文件夹
更新:如cricket_007提到最好将自定义分区程序 jar 放入 /usr/share/java/kafka-connect-storage-common
文件夹中,这是所有其他分区程序所在的位置
关于java - Kafka Connect 无法将自定义存储接收器分区器转换为 Partitioner 接口(interface),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60199073/