multithreading - 中断并行流执行

标签 multithreading concurrency java-8 java-stream

考虑这段代码:

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

任务是应该并行执行的 Runnable 列表。
当我们启动这个线程并开始执行时,根据一些计算,我们需要中断(取消)所有这些任务。

中断线程只会停止其中一个执行。我们如何处理他人?或者也许不应该那样使用 Streams?或者您知道更好的解决方案?

最佳答案

您可以使用 ForkJoinPool中断线程:

@Test
public void testInterruptParallelStream() throws Exception {
    final AtomicReference<InterruptedException> exc = new AtomicReference<>();

    final ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    // use the pool with a parallel stream to execute some tasks
    forkJoinPool.submit(() -> {
        Stream.generate(Object::new).parallel().forEach(obj -> {
            synchronized (obj) {
                try {
                    // task that is blocking
                    obj.wait();
                } catch (final InterruptedException e) {
                    exc.set(e);
                }
            }
        });
    });

    // wait until the stream got started
    Threads.sleep(500);
    // now we want to interrupt the task execution
    forkJoinPool.shutdownNow();
    // wait for the interrupt to occur
    Threads.sleep(500);
    // check that we really got an interruption in the parallel stream threads
    assertTrue(exc.get() instanceof InterruptedException);
}

工作线程确实会被中断,从而终止阻塞操作。您也可以调用shutdown()Consumer 内.

请注意,这些 sleep 可能不会针对适当的单元测试进行调整,您可能有更好的想法,只是在必要时等待。但这足以表明它正在发挥作用。

关于multithreading - 中断并行流执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24243559/

相关文章:

java - 如何使用java 8更新匹配条件的列表对象

java - 使用 ExecutorService 有什么好处?

java - java中的线程/同步

java - 是否应该将 Disruptor (LMAX) 与内存和 CQRS 中的大模型一起使用?

java - 将显式和隐式并行与 java-8 流混合

java 8 List<Pair<String, Integer>> 到 List<String>

ios - Swift 中带有 Firestore 的完成处理程序的替代方案

c - 在多线程程序中在哪里定义互斥锁,有什么区别

android - 异步 LiveData/Room 查询会导致竞争条件吗?

java - 测试多线程代码或确保代码是线程安全的指南