java - 使用 FutureTask 实现并发

标签 java multithreading concurrency futuretask

我有这样的服务:

class DemoService {
    Result process(Input in) {
        filter1(in);
        if (filter2(in)) return...
        filter3(in);
        filter4(in);
        filter5(in);
        return ...

    }
}

现在我想要它更快,我发现有些过滤器可以同时启动,而有些过滤器必须等待其他过滤器完成。例如:

filter1--
         |---filter3--
filter2--             |---filter5
          ---filter4--

这意味着:

1.filter1和filter2可以同时启动,filter3和filter4也是如此

2.filter3和filter4必须等待filter2完成

还有一件事:

如果 filter2 返回 true,则“process”方法会立即返回并忽略以下过滤器。

现在我的解决方案是使用 FutureTask:

            // do filter's work at FutureTask
        for (Filter filter : filters) {
            FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
            executorService.execute(futureTask);
        }

        //when all FutureTask are submitted, wait for result
        for(Filter filter : filters) {
            if (filter.isReturnNeeded()) {
                FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
                riskResult = futureTask.get();
                if (canReturn(filter, riskResult)) {
                    returnOk = true;
                    return riskResult;
                }
            }
        }

我的 CallableFilter:

public class CallableFilter implements Callable<RiskResult> {

    private Filter filter;
    private Context context;

    @Override
    public RiskResult call() throws Exception {
        List<Filter> dependencies = filter.getDependentFilters();
        if (dependencies != null && dependencies.size() > 0) {

            //wait for its dependency filters to finish
            for (Filter d : dependencies) {
                FutureTask<RiskResult> futureTask = context.getTask(d.getId());
                futureTask.get();

            }
        }

        //do its own work
        return filter.execute(context);
    }
}

我想知道:

1.在这种情况下使用 FutureTask 是个好主意吗?有更好的解决方案吗?

2.线程上下文切换的开销。

谢谢!

最佳答案

在 Java 8 中你可以使用 CompletableFuture将你的过滤器串联起来。使用 thenApply 和 thenCompose 系列方法来向 CompletableFuture 添加新的异步过滤器 - 它们将在上一步完成后执行。 thenCombine 在两个独立的 CompletableFutures 都完成时合并它们。使用 allOf 等待两个以上的 CompletableFuture 对象的结果。

如果您不能使用 Java 8,那么 Guava ListenableFuture可以这样做,参见 Listenable Future Explained .使用 Guava,您可以等待多个独立运行的过滤器完成 Futures.allAsList - 这也会返回一个 ListenableFuture。

对于这两种方法,我们的想法是,在您声明您的 future 操作、它们之间的相互依赖关系以及它们的线程之后,您会得到一个单一的 Future 对象,它封装了您的最终结果。

编辑:可以通过使用 complete() 方法显式完成 CompletableFuture 或使用 Guava SettableFuture(实现 ListenableFuture)来实现提前返回

关于java - 使用 FutureTask 实现并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28810474/

相关文章:

java - 如何在不同的线程中使用 JProgressBar?

c++ - boost::scoped_lock 不适用于局部静态变量?

java - 将多个并行对象连接到单个列表中

java - 当一个对象要向对象发送消息时,该方法应该放在哪里?

Java 线程 : Excessive CPU Utilization

multithreading - 为什么名称为 "monitor"?

c# - 并发线程可以同时检查同一个对象锁吗?

java - 在 Java 中访问组合的 UIMA Ruta 注释

java - 如何将枚举映射到整数

java - android 7 : NullPointerException on APK installing with Runtime. getRuntime().exec