java - Kafka Stream Application删除IDE中state目录失败

标签 java windows apache-kafka apache-kafka-streams

我正在开发一个简单的 Kafka Stream 应用程序,它从一个主题中提取消息并在转换后将其放入另一个主题中。我正在使用 Intelij 进行开发。

当我调试/运行此应用程序时,如果我的 IDE 和 Kafka 服务器位于同一台机器 中,它会完美运行

(i.e. with the BOOTSTRAP_SERVERS_CONFIG = localhost:9092 and SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)

但是,当我尝试使用另一台机器进行开发时

(i.e. with the BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092 and SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081 where XXX.XXX.XXX is the ip address of my Kafka),

调试过程第一次运行没有问题。但是,当我在重置偏移量后第二次运行时,收到以下错误:

ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) 
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:

如果我将 my_application_id 更改为 my_application_id2 并运行它,它会在第一次运行时再次运行,但如果我再次运行它会再次收到错误。

我在我的应用程序的最后一句话中有以下代码:

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

有什么解决这个问题的建议吗?

更新:

我检查了在我的开发机器(Windows 平台)中创建的状态目录,如果我在第二次运行之前手动删除这些目录,没有发现错误。我试图以管理员身份运行我的 IDE,因为我认为这可能与文件夹的权限有关。但是,这没有帮助。

完整堆栈供引用:

INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser:109) INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser:110) INFO stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. (org.apache.kafka.streams.processor.internals.StateDirectory:281) Disconnected from the target VM, address: '127.0.0.1:16552', transport: 'socket' Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45) Caused by: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) ... 3 more at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)

更新 2: 经过另一次详细检查,下面的行抛出 IOException

Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {

这一行位于 kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class

可能是Windows系统的问题(抱歉我不是资深JAVA程序员)

最佳答案

对于谷歌员工..

我目前正在使用此 Scala 代码来帮助 Windows 人员处理状态存储的删除。

if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  logger.info("WINDOWS OS MODE - Cleanup state store.")
  try {
    FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
    FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
  } catch {
    case e: Exception => logger.error(e.toString)
  }
}
else {
  streams.cleanUp()
}

关于java - Kafka Stream Application删除IDE中state目录失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50602512/

相关文章:

java - 有没有办法将数据附加到 Bukkit ItemStack?

Java:动态更改 String[][]

windows - 最小化应用程序时隐藏表单

java - 如何使用 Java API 添加 SCRAM-SHA-512 kafka 配置?

java - MyBatis 问题与 IN 条件 <foreach with List inside a Map

java - 将变量从操作类 (Struts 2) 传递到 JSP

windows - TortoiseGit 错误 - 无法获取所有引用。 libgit2 返回 : corrupted loose reference file

windows - PhoneGap错误: EPERM: Operation not permitted (windows 10)

python - 如何使用 confluent-kafka-python 确定是否存在 kafka 主题

hadoop - 将日志文件kafka移动到hadoop