java - 如何使用 Kafka 1.1.0 以编程方式创建主题

标签 java apache-kafka

我最近升级到了 Kafka 1.1.0。我正在尝试为 kafka 消费者创建单元测试。为此,如果单元测试可以创建用于测试的主题,那将是理想的选择。我发现一些代码看起来应该可以实现我想要的功能。但是,当我运行它时,它会抛出异常: java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/Au​​toCloseable;Ljava/lang/String;)V

这是我在网上找到的创建主题的代码:

@BeforeClass
public static void createTopic() {
   try (final AdminClient adminClient = AdminClient.create(configure())) {
        try {
            // Define topic
            NewTopic newTopic = new NewTopic("test-orders", 1, (short)1);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(ordersTopic).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }
}

但是当我运行它时它抛出异常。

java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/AutoCloseable;Ljava/lang/String;)V
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:334)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
at com.sial.notifications.topics.OrdersTopicsTests.createTopic(OrdersTopicsTests.java:162)

我传递给它的唯一配置参数是引导服务器和 client.id。 我究竟做错了什么?看起来很简单

最佳答案

当我针对 1.1.0 代理单独运行时,这个稍微修改过的代码对我有用:

public static void main(String[] args) {
    final String ordersTopic = "test-orders";
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    try (final AdminClient adminClient = AdminClient.create(props)) {
        try {
            // Define topic
            NewTopic newTopic = new NewTopic(ordersTopic, 1, (short)1);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(ordersTopic).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException))
                throw new RuntimeException(e.getMessage(), e);
        }
    }
}

由于这与您的代码非常相似,并且根据您看到的错误,也许您还没有完全理清对 Kafka 库的依赖关系?我使用了 Maven 工件 org.apache.kafka:kafka_2.12:1.1.0

关于java - 如何使用 Kafka 1.1.0 以编程方式创建主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51274085/

相关文章:

apache-kafka - 有时,发布到 kafka 的消息会丢失

java.lang.UnsatisfiedLinkError : org. opencv.imgcodecs.Imgcodecs.imread_0(Ljava/lang/String;I)J

java - 如何从java中的jdatechooser知道所选日期是星期几

mysql - Kafka Connect JDBC 连接器查询 + 在初始轮询时使用大数据集增加模式阻塞

docker - 为什么kafka docker需要监听unix套接字

apache-kafka - 有没有办法对 Kafka 流中的输入主题进行重新分区?

java - 如何在轴上投影多边形

java - 500 错误 - 使用 AcrCloud RESTful api 上传音频文件

java - 如何查看并记录URL重定向?

java - 无法向消息中心发布消息