Java ExecutorService invokeAll() 中断

标签 java multithreading concurrency executorservice

我有一个宽度为 10 的固定线程池 ExecutorService,以及 100 个 Callable 的列表,每个等待 20 秒并记录它们的中断。

我在一个单独的线程中对该列表调用 invokeAll,并且几乎立即中断了该线程。 ExecutorService执行如预期中断,但Callable记录的实际中断次数远超预期10次——20-40次左右。为什么会这样,如果 ExecutorService 可以同时执行不超过 10 个线程?

完整源代码:(由于并发性,您可能需要多次运行它)

@Test
public void interrupt3() throws Exception{
    int callableNum = 100;
    int executorThreadNum = 10;
    final AtomicInteger interruptCounter = new AtomicInteger(0);
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum);
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>();
    for (int i = 0; i < callableNum; ++i) {
        executeds.add(new Waiter(interruptCounter));
    }
    Thread watcher = new Thread(new Runnable() {

        @Override
        public void run(){
            try {
                executorService.invokeAll(executeds);
            } catch(InterruptedException ex) {
                // NOOP
            }
        }
    });
    watcher.start();
    Thread.sleep(200);
    watcher.interrupt();
    Thread.sleep(200);
    assertEquals(10, interruptCounter.get());
}

// This class just waits for 20 seconds, recording it's interrupts
private class Waiter implements Callable <Object> {
    private AtomicInteger    interruptCounter;

    public Waiter(AtomicInteger interruptCounter){
        this.interruptCounter = interruptCounter;
    }

    @Override
    public Object call() throws Exception{
        try {
            Thread.sleep(20000);
        } catch(InterruptedException ex) {
            interruptCounter.getAndIncrement();
        }
        return null;
    }
}

使用 WinXP 32 位、Oracle JRE 1.6.0_27 和 JUnit4

最佳答案

我不同意你应该只接收 10 次中断的假设。

Assume the CPU has 1 core.
1. Main thread starts Watcher and sleeps
2. Watcher starts and adds 100 Waiters then blocks
3. Waiter 1-10 start and sleep in sequence
4. Main wakes and interrupts Watcher then sleeps
5. Watcher cancels Waiter 1-5 then is yielded by the OS   (now we have 5 interrupts)
6. Waiter 11-13 start and sleep
7. Watcher cancels Waiter 6-20 then is yielded by the OS   (now we have 13 interrupts)
8. Waiter 14-20 are "started" resulting in a no-op
9. Waiter 21-24 start and sleep
....

从本质上讲,我的论点是不能保证 Watcher 线程在它必须产生时间片并允许 ExecutorService 的工作线程启动更多 Waiter 任务之前将被允许取消所有 100 个“Waiter”RunnableFuture 实例。

更新:显示来自 AbstractExecutorService

的代码
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (Future<T> f : futures) {
            if (!f.isDone()) {
                try {
                    f.get(); //If interrupted, this is where the InterruptedException will be thrown from
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads
    }
}

包含 f.cancel(true) 的 finally block 是将中断传播到当前正在运行的任务的时间。如您所见,这是一个紧凑的循环,但不能保证执行循环的线程能够在一个时间片内遍历 Future 的所有实例。

关于Java ExecutorService invokeAll() 中断,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7976697/

相关文章:

java - NIO 持久化数组 : Java

Java 圆形(6 段)

java - then在 CompletableFuture 中应用

用于多线程系统的 Java 不可变对象(immutable对象)。我做错了什么?

java - 从 jar 文件中提取 dll

java - 如何获得字符串的 UTF-8 转换

java - 有没有办法将线程的命运与对象联系起来?

c# - 我如何让我的方法等待所有线程完成?

java - Thread.start() 返回时是否保证线程已经启动?

c - 如何通过基本信号处理在c中同步多个进程