我正在开发一个简单的 Kafka Stream 应用程序,它从一个主题中提取消息并在转换后将其放入另一个主题中。我正在使用 Intelij 进行开发。
当我调试/运行此应用程序时,如果我的 IDE 和 Kafka 服务器位于同一台机器 中,它会完美运行
(i.e. with the BOOTSTRAP_SERVERS_CONFIG = localhost:9092 and SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)
但是,当我尝试使用另一台机器进行开发时
(i.e. with the BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092 and SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081 where XXX.XXX.XXX is the ip address of my Kafka),
调试过程第一次运行没有问题。但是,当我在重置偏移量后第二次运行时,收到以下错误:
ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297)
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:
如果我将 my_application_id
更改为 my_application_id2
并运行它,它会在第一次运行时再次运行,但如果我再次运行它会再次收到错误。
我在我的应用程序的最后一句话中有以下代码:
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
有什么解决这个问题的建议吗?
更新:
我检查了在我的开发机器(Windows 平台)中创建的状态目录,如果我在第二次运行之前手动删除这些目录,没有发现错误。我试图以管理员身份运行我的 IDE,因为我认为这可能与文件夹的权限有关。但是,这没有帮助。
完整堆栈供引用:
INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser:109) INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser:110) INFO stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. (org.apache.kafka.streams.processor.internals.StateDirectory:281) Disconnected from the target VM, address: '127.0.0.1:16552', transport: 'socket' Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45) Caused by: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) ... 3 more at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)
更新 2: 经过另一次详细检查,下面的行抛出 IOException
Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
这一行位于 kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class
可能是Windows系统的问题(抱歉我不是资深JAVA程序员)
最佳答案
对于谷歌员工..
我目前正在使用此 Scala 代码来帮助 Windows 人员处理状态存储的删除。
if (System.getProperty("os.name").toLowerCase.contains("windows")) {
logger.info("WINDOWS OS MODE - Cleanup state store.")
try {
FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
} catch {
case e: Exception => logger.error(e.toString)
}
}
else {
streams.cleanUp()
}
关于java - Kafka Stream Application删除IDE中state目录失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50602512/