java - ExecutorCompletionService 未获取 Callable 返回的项目?

标签 java multithreading concurrency

我的 ExecutorCompletionService 出现了奇怪的行为。该项目被添加到 ExecutorCompletionService.submit() 中。然后它会被处理并由之前提交的 Callable 工作线程返回。返回之后,ExecutorCompletionService.take() 永远不会看到它,因此永远不会看到返回更多项目的阻塞?我真的不知道发生了什么事。我已经创建了打印行,并且可以看到它完成了可调用工作线程。一旦发生这种情况,ExecutorCompletionService.take 就应该准备好接受,但在某些情况下,事情会锁定,有时又没问题?

我创建了一个测试用例,如果您运行它几次,您会发现它在某些情况下会锁定并且永远不会占用任何已完成的线程

线程死锁演示

import java.util.Observable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadDeadlockDemo extends Observable implements Runnable  {

private CompletionService<String> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private int numThreadsInPool;
private BlockingQueue<String> queue;
public ThreadDeadlockDemo(int numThreads)
{
    numThreadsInPool = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    pool = new ExecutorCompletionService<String>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    responseWorkerThread.start();
    queue = new LinkedBlockingQueue<String>();
    new Thread(this).start();
}

public ThreadDeadlockDemo()
{
    numThreadsInPool = 1;
    executor = Executors.newFixedThreadPool(1);
    pool = new ExecutorCompletionService<String>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    responseWorkerThread.start();
    queue = new LinkedBlockingQueue<String>();
    new Thread(this).start();
}

public void setThreadCount(int numThreads)
{
    executor = Executors.newFixedThreadPool(numThreads);
    pool = new ExecutorCompletionService<String>(executor);
    numThreadsInPool = numThreads;
}

public void add(String info) {
    queue.add(info);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    while(shouldRun)
    {   
        try {
            String info = queue.take();
            Callable<String> worker = new WorkerThread(info);
            System.out.println("submitting to pooler: " + info);
            pool.submit(worker);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
    }
}

/**
 * Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
 * are complete it will send them to server for completion.
 * @author Steve
 *
 */
class HttpSchedulerWorker  implements Runnable{

    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            String vulnInfo = null;
            try {
                Future<String>  tmp = pool.take();
            //  Future<VulnInfo> tmp = pool.poll();
                if(tmp != null)
                    vulnInfo = tmp.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(vulnInfo != null)
            {
                System.out.println("info was taken from pool completed: "  + vulnInfo);
            }



        }
    }

}

}

WorkerClass:这是添加到执行程序池并返回的线程工作线程,但在某些情况下永远不会在 ThreadlockDemos ExecutorCompletionService 池中收到通知?

import java.util.concurrent.Callable;

public class WorkerThread implements Callable<String>{


String info;
WorkerThread(String info)
{
    this.info = info;
}

//@Override
public String call() throws Exception {
    System.out.println("sending vuln info: " + info);
    return info;
}


}

这是我的测试类,只是将项目添加到队列中。这是我的控制台的打印输出,看起来似乎失败了。它添加到队列中并对其进行处理并返回值。但是 take() 从来没有被调用过,为什么呢?它有时有效,有时失败,这让我很难看出问题所在。我很想说它在 java 中的错误,但我环顾四周没有发现这些类有任何问题?

 public class HttpSchedulerThreadedUnitTest {

ThreadDeadlockDemo scheduler; 
public HttpSchedulerThreadedUnitTest(){

    setupScheduler();
    for(int i=0; i < 5;i++)
    {
        scheduler.add(i+"");
    }
}

private void setupScheduler()
{
    scheduler = new ThreadDeadlockDemo();
    scheduler.setThreadCount(1);
}

public static void main(String[] args)
{
    new HttpSchedulerThreadedUnitTest();
}

}

控制台打印:这是它在 WorkerThread 完成时从不从池中获取的运行情况 提交到池化器:0 提交到池化器:1 提交到池化器:2 发送漏洞信息:0 提交到池化器:3 发送漏洞信息:1 提交到池化器:4 发送漏洞信息:2 发送漏洞信息:3 发送漏洞信息:4

控制台打印:它实际上正在从池中获取项目返回! 提交到池化器:0 提交到池化器:1 提交到池化器:2 提交到池化器:3 提交到池化器:4 发送漏洞信息:0 信息已从已完成的池中获取:0 发送漏洞信息:1 信息已从已完成的池中获取:1 发送漏洞信息:2 信息从已完成的池中获取:2 发送漏洞信息:3 信息从已完成的池中获取:3 发送漏洞信息:4 信息已从池中获取完成:4

最佳答案

这是很多代码。如果你能减少它(通过删除 http 相关部分等),那就太好了。我也不确定你的意思 After that return the ExecutorCompletionService.take 永远不会看到它,所以永远不会看到返回更多项目的阻塞?

您可以在锁定时进行线程转储,并查看哪个线程被锁定在代码的哪个点。

同时,我确实看到一些看起来错误的代码。

while(requestQueue.isEmpty()){
            try {
                synchronized(this)
                {
                    wait();
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            }

在这里,您正在同步一个可运行的对象。这几乎总是错误的,因为可运行对象通常不会被多个线程访问。您还在测试同步语句之外的条件。通常您按如下方式使用等待:

synchronized(lock){
    while(!condition){
        wait();
    }
}

但是,我没有看到任何在可运行对象上调用通知的代码。这可能会导致程序挂起。基本上你在等待某件事,但没有人叫醒你,所以你无限期地等待。这是否是您所面临问题的原因,可以通过在发生这种情况时查看线程转储来轻松确定。

如果您使用队列,最好的建议是使用阻塞队列作为请求队列。这样您就不必完全执行此等待/通知业务。

关于java - ExecutorCompletionService 未获取 Callable 返回的项目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6977617/

相关文章:

java - Groovy 数组初始化

java - 当数据库中有多个对象时,Spring boot rest 响应在 OneToMany 上给出 1 个结果

c# - 如何使用 System.Threading.Parallel (.Net 4.0) 增加并发并行任务

haskell - 确保 Haskell 中线程之间的 CPU 时间分配均匀

go - Go 中的并发和超时

java - 在Java中获取带有时区的特定日期

java - JPA illegalStateException - CascadeType 问题

java - Java 监视器的等待集是否优先于入口集?

java - 线程同步行为

Python 多线程/处理模块,用于具有需要排序的依赖项的任务