java - 自定义任务执行器或 "am I reinventing the wheel?"

标签 java multithreading concurrency java-8 java-stream

我正在尝试创建一个实用程序类,它将能够处理可运行程序包并以不同的组合(同步、异步)执行它们。

例如:想象这是一个类似 json 的复合任务表示。 “[]”保持异步任务,“{}”保持同步任务。

[
    task,
    [task, task, task],
    {task, task, [task, task]}
]

这是(将)以方法链接方式实现:

fromAsyncTasks(
    from(runnable),
    fromSyncedTasks(from(runnable), from(runnable), from(runnable)),
    fromAsyncTasks(from(runnable), from(runnable), fromAsyncTasks(from(runnable), from(runnable)))
).execute();

正如您可能理解的那样,.execute() 正在以同步或异步方式递归调用其他任务的execute()。此外,任务接口(interface)支持终止()来停止(中断任务),因此如果我保留对这些任务的引用,我将能够终止它们。

所以问题#1是:是否有任何工具、库至少提供此功能?

和#2:如果我使用流进行并行(异步情况)执行,如何终止它们?

tasks.parallelStream().forEach(Runnable::run)

最佳答案

自 Java 8 起,类 java.util.concurrent.CompletableFuture是标准库的一部分。该类基本上提供了构建您想要的内容所需的一切。

为了直接支持您的用例,您需要某种抽象,即Runnable、同步任务列表或异步任务列表。在下面的代码中,这个抽象是类Task:

interface Task {
    CompletableFuture<Void> execute(
        CompletableFuture<Void> f, Executor e);
}

以下是创建三种不同类型任务的三种工厂方法:

static Task wrap(Runnable runnable) {
    return (f, e) -> f.thenRunAsync(runnable, e);
}
static Task sync(Task... tasks) {
    return (f, e) -> {
        for (Task task : tasks) {
            f = task.execute(f, e);
        }
        return f;
    };
}
static Task async(Task... tasks) {
    return (f, e) -> tasks.length == 0 ? f :
        CompletableFuture.allOf(
        Arrays.stream(tasks)
        .map(t -> t.execute(f, e))
        .toArray(CompletableFuture[]::new));
}

现在,您可以轻松创建可运行、同步和异步任务的任意嵌套结构,并使用任意执行器执行它们。这是一个简短的示例:

public static void main(String... args) {
    Task task = sync(
        wrap(() -> log("1")),
        async(
            wrap(() -> log("A")),
            wrap(() -> log("B")),
            wrap(() -> log("C"))),
        wrap(() -> log("2")),
        wrap(() -> log("3")));
    ExecutorService executor = Executors.newFixedThreadPool(4);
    task.execute(CompletableFuture.completedFuture(null), executor).join();
    executor.shutdown();
}

关于java - 自定义任务执行器或 "am I reinventing the wheel?",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24243170/

相关文章:

java - ElasticSearch post_filter Java API问题

java - 在鼠标单击或触摸时将 box2d 主体隐藏

java - 并发编程是更加网格化还是更加集群化?

java - Java多线程网络爬虫中控制线程数量和对象访问

concurrency - 如果并行处理,为什么在无限的数字流中按素性过滤会永远持续下去?

c# - 限制并发线程等于处理器数量?

java - 关于在 Java 中调用构造函数和定义变量的问题(作业)

java - 一个最小的 Java 程序应该是什么样子的?

c++ - 如何使用带多线程的 SAPI 将文本转换为 Wave?

java - thread.start() 和 executor.submit(thread) 之间的区别