spring-boot - kafka嵌入: java. io.FileNotFoundException :/tmp/kafka-7785736914220873149/replication-offset-checkpoint. tmp

标签 spring-boot cassandra spring-kafka

我在集成测试中使用 kafkaEmbedded 并得到 FileNotFoundException :

java.io.FileNotFoundException: /tmp/kafka-7785736914220873149/replication-offset-checkpoint.tmp 
at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[na:1.8.0_141]
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:43) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:58) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1118) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) [scala-library-2.11.11.jar:na]
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) [scala-library-2.11.11.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) [scala-library-2.11.11.jar:na]
at kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$1.apply$mcV$sp(ReplicaManager.scala:211) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57) [kafka_2.11-0.11.0.0.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_141]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]

我的测试成功通过,但在构建结束时出现此错误

经过数小时的研究,我发现了这一点:
  • kafka TestUtils.tempDirectory 方法用于为嵌入式 kafka 代理创建临时目录。它还注册了关闭钩子(Hook),当 JVM 退出时删除这个目录。
  • 当单元测试完成执行时,它会调用 System.exit,它会依次执行所有已注册的关闭 Hook

  • 如果 kafka 代理在单元测试结束时运行,它将尝试在已删除的目录中写入/读取数据并产生不同的 FileNotFound 异常。

    我的配置类:
    @Configuration
    public class KafkaEmbeddedConfiguration {
    
    private final KafkaEmbedded kafkaEmbedded;
    
    public KafkaEmbeddedListenerConfigurationIT() throws Exception {
        kafkaEmbedded = new KafkaEmbedded(1, true, "topic1");
        kafkaEmbedded.before();
    }
    
    @Bean
    public KafkaTemplate<String, Message> sender(ProtobufSerializer protobufSerializer,
            KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) throws Exception {
        KafkaTemplate<String, Message> sender = KafkaTestUtils.newTemplate(kafkaEmbedded, new StringSerializer(),
                protobufSerializer);
    for (MessageListenerContainer listenerContainer : 
    registry.getListenerContainers()) {
            ContainerTestUtils.waitForAssignment(listenerContainer, 
    kafkaEmbedded.getPartitionsPerTopic());
        }        
    
        return sender;
    }
    

    测试类:
    @RunWith(SpringRunner.class)
    public class DeviceEnergyKafkaListenerIT {
     ...
    @Autowired
    private KafkaTemplate<String, Message> sender;
    
    @Test
    public void test (){
        ...
        sender.send(topic, msg);
        sender.flush();
    }
    

    有什么想法可以解决这个问题吗?

    最佳答案

    @ClassRule经纪人,添加 @AfterClass方法...

    @AfterClass
    public static void tearDown() {
        embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
        embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());
    }
    

    对于 @Rule或 bean,使用 @After方法。

    关于spring-boot - kafka嵌入: java. io.FileNotFoundException :/tmp/kafka-7785736914220873149/replication-offset-checkpoint. tmp,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49006479/

    相关文章:

    java - 自动生成表格

    java - Spring Boot : Kafka health indicator

    java - 在 Spring Boot 中使用 MongoTemplate 检查 MongoDB 连接

    java - Spring Boot MVC 中如何更新数据

    postgresql - 在 spring boot 中使用 JDBC 和 spring data jpa 为我的 session spring 创建单独的数据源

    java - 您可以在多个线程上启动 Spring Boot Rest 应用程序吗?

    java - Spring Boot 更新数据

    database - cassandra - 如何执行表查询?

    Cassandra 物化 View 影响

    Spring Kafka 和事务