java - Akka Future 线程永远不会被释放

标签 java multithreading akka

我正在尝试并行执行一些繁重的计算。为此,我使用 Akka(Java 语言)。

按照文档的建议,我将计算包装到 Future 中,而不是定义参与者。

计算正确地并行执行,但问题是为 Future<> 打开的线程永远不会关闭,在某些时候我遇到错误:OutOfMemory: unable to create new native thread

我的代码结构如下所示:

public void compute(){
    for(Attribute attribute : attributes){
        computeAttribute(attribute);
    }
}

private void computeAttribute(Attribute attribute){
    ActorSystem system = ActorSystem.create("System");
    int nb = findNumberOfIterations(attribute);

    List<Future<AttributeResult>> answers = new ArrayList<>();

    // Computation to be performed in parallel
    for (int i = 0; i < nb; i++) {
        // MasterCaller contains the heavy computation logic
        Future<AttributeResult> f = Futures.future(new MasterCaller(attribute, i), system.dispatcher());
        answers.add(f);
    }

    Future<Iterable<AttributeResult>> futures = Futures.sequence(answers, system.dispatcher());

    Future<AttributeResult> futureTotal = futures.map(new MasterMapper(), system.dispatcher());

    try {
        // Additional processing step after all resulted have been computed
        AttributeResult value = Await.result(futureTotal, Duration.create(1, TimeUnit.SECONDS));
        postProcess(value, attribute);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

假设每次 computeAttribute() 大约创建 10 个线程叫做。那么如果attributes中有1000个元素列表中,代码将继续创建 10 000 个 Activity 线程!

我真的很感激能得到一些帮助,因为这个问题使得 Akka 的使用和并行计算变得不可能。

最佳答案

问题在于,您正在为每次调用computeAttribute创建一个新的ActorSystem,并且您从未将其关闭。由于您在 Akka 中没有使用任何内容,因此我建议您的computeAttribute方法采用scala.concurrent.ExecutionContext作为参数,然后您可以传入您通过 ExecutionContext.fromExecutorServiceExecutionContext.fromExecutor 自己创建的一个(您仍然需要在适当的时间点关闭它们)或传入 ExecutionContext.global().

关于java - Akka Future 线程永远不会被释放,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34858518/

相关文章:

scala - Akka actor并发问题

java - 如何保证一段代码在多线程环境下只执行一次?

java - 如何在java中使用比较器来比较diff对象值

multithreading - IdHTTP获取响应线程

C#线程数组

scala - Akka-Http 中的实体是什么?

java - 将路径保存在字符串中

java - 使用 OR 运算符进行 boolean 赋值

java - 了解 Stream API ForEach Task 中的主循环

java - 使用连接池后仍然出现 You can't run on a Closed ResultSet 错误