java - 使用 WatchService 设置发布者-订阅者

标签 java multithreading

我正在尝试使用 NIO 中的 WatchService 设置 2 路发布者-订阅者。

我对线程没有太多经验,所以如果我没有任何意义,请随时告诉我!

这只是一个了解该库如何工作的示例,但生产代码将监听输入文件中的更改,当文件更改时,它将执行一些计算,然后写入输出文件。该输出文件将由另一个程序读取,并对其运行一些计算。然后将写入输入文件并继续循环。

对于这个测试,我使用观察者创建了 2 个线程,第一个线程监听 first.txt 并写入 second.txt,第二个线程等待 second.txt 并写入 first.txt。我所做的只是增加计数变量并写入每个线程的输出文件。两个线程都对它们真正关心的文件进行阻塞调用和过滤器,所以我认为行为看起来像

两个线程都在等待 take() 调用。 更改 first.txt 以启动该过程 这会触发第一个线程更改 second.txt 然后触发第二个线程更改 first.txt 等等。

至少我是这么希望的。最终结果是线程完全不同步,当我执行此操作并计数到 1000 时,一个线程通常落后 50 多个点。

这是观察者的代码

Watcher(Path input, Path output) throws IOException {
    this.watcher = FileSystems.getDefault().newWatchService();
    this.input = input;
    this.output = output;
    dir = input.getParent();
    dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}

void watchAndRespond() throws IOException, InterruptedException {

    while (count < 1000) {

        WatchKey key = watcher.take();

        for (WatchEvent<?> event: key.pollEvents()) {
            if (! event.context().equals(input.getFileName())) {
                continue;
            }

            WatchEvent.Kind kind = event.kind();
            if (kind == OVERFLOW) {
                continue;
            }

            count++;

            try (BufferedWriter out = new BufferedWriter(new FileWriter(output.toFile()))) {
                out.write(count + "");
            }
        }
        key.reset();
    }
}

我不想读取文件来确定文件是否已更改,因为生产中的这些文件可能很大。

我觉得这可能太复杂了,我正在尝试通过截肢来治疗膝盖擦伤。我是否错误地使用了这个库?我是否使用了错误的工具来完成这项工作?如果是,是否有其他我可以使用的文件监听库,这样我就不必轮询上次编辑的内容?

编辑: 哎呀,这是我编写的设置两个线程的测试

@Test
public void when_two_watchers_run_together_they_end_up_with_same_number_of_evaluation() throws InterruptedException, IOException {
    //setup
    Path input = environment.loadResourceAt("input.txt").asPath();
    Path output = environment.loadResourceAt("output.txt").asPath();

    if (Files.exists(input)) {
        Files.delete(input);
    }
    if (Files.exists(output)) {
        Files.delete(output);
    }

    Thread thread1 = makeThread(input, output, "watching input");
    Thread thread2 = makeThread(output, input, "watching output");

    //act
    thread1.start();
    thread2.start();

    Thread.sleep(50);

    BufferedWriter out = new BufferedWriter(new FileWriter(input.toFile()));
    out.write(0 + "");
    out.close();

    thread1.join();
    thread2.join();

    int inputResult = Integer.parseInt(Files.readAllLines(input).get(0));
    int outputResult = Integer.parseInt(Files.readAllLines(output).get(0));
    //assert
    assertThat(inputResult).describedAs("Expected is output file, Actual is input file").isEqualTo(outputResult);
}

public Thread makeThread(Path input, Path output, String threadName) {
    return new Thread(() ->
    {
        try {
            new Watcher(input, output).watchAndRespond();
        }
        catch (IOException | InterruptedException e) {
            fail();
        }

    }, threadName);
}

我认为问题在于某些修改将多个事件放入队列中,此时我无法辨别它们是否是由一次保存创建的 2 个事件或 2 个单独的保存创建的事件。

最佳答案

该工具似乎非常正确,但您的代码必须按顺序流动,否则正如您所注意到的,一切都会不同步。 将其视为必须在另一笔交易开始之前完成的交易。 在这种情况下,交易可以归结为 1.) 检测 File1 更改 2.) 修改文件2 3.) 检测 File2 更改 4.) 修改文件1

所以在这个循环完全结束之前如果另一个循环开始的话就会有麻烦。当您使用线程时,调度和执行并不完全可预测,因此您不知道 2 个线程正在做什么。 他们是否按照您的要求按顺序做事。 因此,您必须与任何人共享您的线程代码才能给出特定的 解决方案。

另一点是你可以保留一个包含更改的小更改文件吗 并使用它而不是使用更大的生产文件。那 这样您就可以将焦点减少到较小的对象。

关于java - 使用 WatchService 设置发布者-订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30061479/

相关文章:

java - 我真的需要数据库事务吗?

python - 大量的multiprocessing.Process导致死锁

c# - 为什么在线程中运行异步操作比纯任务或纯线程操作慢得多

multithreading - Delphi简单类型线程安全吗?

c# - RoleEnvironment.RequestRecycle() 不触发重新启动

java - Java HTTP Client 是否处理压缩

java - 在 Virgo 中部署 VI JAVA 抛出 java.lang.NoClassDefFoundError

java - Eclipse (Mars - 4.5.0) 中的 Maven 插件 "mark invalid"

java - 匹配最多 2 位小数的数字的正则表达式

java - 具有屏幕旋转的 AsyncTask - onRetainNonConfigurationInstance 已弃用