我正在尝试使用 NIO 中的 WatchService 设置 2 路发布者-订阅者。
我对线程没有太多经验,所以如果我没有任何意义,请随时告诉我!
这只是一个了解该库如何工作的示例,但生产代码将监听输入文件中的更改,当文件更改时,它将执行一些计算,然后写入输出文件。该输出文件将由另一个程序读取,并对其运行一些计算。然后将写入输入文件并继续循环。
对于这个测试,我使用观察者创建了 2 个线程,第一个线程监听 first.txt
并写入 second.txt
,第二个线程等待 second.txt
并写入 first.txt
。我所做的只是增加计数变量并写入每个线程的输出文件。两个线程都对它们真正关心的文件进行阻塞调用和过滤器,所以我认为行为看起来像
两个线程都在等待 take() 调用。
更改 first.txt
以启动该过程
这会触发第一个线程更改 second.txt
然后触发第二个线程更改 first.txt
等等。
至少我是这么希望的。最终结果是线程完全不同步,当我执行此操作并计数到 1000 时,一个线程通常落后 50 多个点。
这是观察者的代码
Watcher(Path input, Path output) throws IOException {
this.watcher = FileSystems.getDefault().newWatchService();
this.input = input;
this.output = output;
dir = input.getParent();
dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}
void watchAndRespond() throws IOException, InterruptedException {
while (count < 1000) {
WatchKey key = watcher.take();
for (WatchEvent<?> event: key.pollEvents()) {
if (! event.context().equals(input.getFileName())) {
continue;
}
WatchEvent.Kind kind = event.kind();
if (kind == OVERFLOW) {
continue;
}
count++;
try (BufferedWriter out = new BufferedWriter(new FileWriter(output.toFile()))) {
out.write(count + "");
}
}
key.reset();
}
}
我不想读取文件来确定文件是否已更改,因为生产中的这些文件可能很大。
我觉得这可能太复杂了,我正在尝试通过截肢来治疗膝盖擦伤。我是否错误地使用了这个库?我是否使用了错误的工具来完成这项工作?如果是,是否有其他我可以使用的文件监听库,这样我就不必轮询上次编辑的内容?
编辑: 哎呀,这是我编写的设置两个线程的测试
@Test
public void when_two_watchers_run_together_they_end_up_with_same_number_of_evaluation() throws InterruptedException, IOException {
//setup
Path input = environment.loadResourceAt("input.txt").asPath();
Path output = environment.loadResourceAt("output.txt").asPath();
if (Files.exists(input)) {
Files.delete(input);
}
if (Files.exists(output)) {
Files.delete(output);
}
Thread thread1 = makeThread(input, output, "watching input");
Thread thread2 = makeThread(output, input, "watching output");
//act
thread1.start();
thread2.start();
Thread.sleep(50);
BufferedWriter out = new BufferedWriter(new FileWriter(input.toFile()));
out.write(0 + "");
out.close();
thread1.join();
thread2.join();
int inputResult = Integer.parseInt(Files.readAllLines(input).get(0));
int outputResult = Integer.parseInt(Files.readAllLines(output).get(0));
//assert
assertThat(inputResult).describedAs("Expected is output file, Actual is input file").isEqualTo(outputResult);
}
public Thread makeThread(Path input, Path output, String threadName) {
return new Thread(() ->
{
try {
new Watcher(input, output).watchAndRespond();
}
catch (IOException | InterruptedException e) {
fail();
}
}, threadName);
}
我认为问题在于某些修改将多个事件放入队列中,此时我无法辨别它们是否是由一次保存创建的 2 个事件或 2 个单独的保存创建的事件。
最佳答案
该工具似乎非常正确,但您的代码必须按顺序流动,否则正如您所注意到的,一切都会不同步。 将其视为必须在另一笔交易开始之前完成的交易。 在这种情况下,交易可以归结为 1.) 检测 File1 更改 2.) 修改文件2 3.) 检测 File2 更改 4.) 修改文件1
所以在这个循环完全结束之前如果另一个循环开始的话就会有麻烦。当您使用线程时,调度和执行并不完全可预测,因此您不知道 2 个线程正在做什么。 他们是否按照您的要求按顺序做事。 因此,您必须与任何人共享您的线程代码才能给出特定的 解决方案。
另一点是你可以保留一个包含更改的小更改文件吗 并使用它而不是使用更大的生产文件。那 这样您就可以将焦点减少到较小的对象。
关于java - 使用 WatchService 设置发布者-订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30061479/