java - 如何检查所有 Actor 是否完成后

标签 java akka actor

我是 Akka 工具包的新手。我需要在多个文件上运行一个进程,这需要花费大量时间。因此,我为每个文件创建了一个 actor 并开始处理。我在 POJO 类中创建这些参与者,如下所示:

public class ProcessFiles {
    private static final Logger logger = LoggerFactory.getLogger(ProcessFiles.class.getSimpleName());

    public static void main(String[] args) throws IOException, InterruptedException {
        long startTime = System.currentTimeMillis();

        logger.info("Creating actor system");
        ActorSystem system = ActorSystem.create("actor_system");

        Set<String> files = new HashSet<>();
        Stream<String> stringStream = Files.lines(Paths.get(fileName));
        stringStream.forEach(line -> files.addAll(Arrays.asList(line.split(","))));
        List<CompletableFuture<Object>> futureList = new ArrayList<>();

        files.forEach((String file) -> {
            ActorRef actorRef = system.actorOf(Props.create(ProcessFile.class, file));
            futureList.add(PatternsCS.ask(actorRef, file, DEFAULT_TIMEOUT).toCompletableFuture());
        });

        boolean isDone;
        do {
            Thread.sleep(30000);
            isDone = true;
            int count = 0;
            for (CompletableFuture<Object> future : futureList) {
                isDone = isDone & (future.isDone() || future.isCompletedExceptionally() || future.isCancelled());
                if (future.isDone() || future.isCompletedExceptionally() || future.isCancelled()) {
                    ++count;
                }
            }
            logger.info("Process is completed for " + count + " files out of " + files.size() + " files.");
        } while (!isDone);
        logger.info("Process is done in " + (System.currentTimeMillis() - startTime) + " ms");
        system.terminate();
    }
} 

这里,ProcessFile 是参与者类。在调用所有参与者退出程序后,主进程会每隔 30 秒检查所有参与者是否完成。有没有更好的方法来实现这种功能?

最佳答案

我建议再创建一个 actor 来跟踪系统中所有 actor 的终止,并在所有 actor 被杀死时关闭 actor 系统。 所以在你的应用程序中 -

ProcessFile 参与者可以在处理文件后向自己发送毒丸。 WatcherActor 将监视(context.watch(processFileActor)) ProcessFileActor 并维护所有已注册 ProcessFile Actor 的计数。

Actor 终止后,WatcherActor 将收到终止消息。 它将减少计数,当计数达到 0 时,关闭 ActorSystem。

关于java - 如何检查所有 Actor 是否完成后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48898470/

相关文章:

java - setVisibleItemProperties 无效 VAADIN

Scala Actor - 最糟糕的做法?

scala - 如何在 Play 中通过网络套接字向 Actor 发送消息!框架?

java - 无法将按钮置于右中位置

java - 如何使用 recyclerview 处理评级栏

scala - 我是否需要重复使用相同的 Akka ActorSystem,还是可以在每次需要时创建一个?

scala - 在 Akka/Spray 上限制 HTTP 请求

java - 使用 JUnit 对 akka actor 进行单元测试

sockets - 随后调用Actor Freeze计划

java - 带有 Freemarker 的 Netbeans 模板 : built-in with argument fails