java - 队列中的多线程作业 - 作业太多?

标签 java multithreading queue

假设我有一个很大的队列,大约有 10,000 个对象。我想创建一个具有 5 个工作线程的线程池,每个线程从队列中删除一项并对其进行处理,直到队列为空。

我担心的是,使用我在不同地方看到的设置,我最终会立即创建 10,000 个工作岗位,但通过 5 个 worker 来执行它们。我觉得这并不是真正可扩展的 - 队列已经有 10,000 个项目,现在堆栈上有额外的 10,000 个作业(即使它们没有主动执行,这似乎是一个内存问题)。

这似乎就是这个答案所暗示的:https://stackoverflow.com/a/9916299/774359 - 这是“//现在提交我们的工作”部分让我担心。我有效地将队列转储到作业中是否有问题?

这是我迄今为止所拥有的一个简短示例:

在主函数中:

ExecutorService executor = Executors.newFixedThreadPool(5);
while(!hugeQueue.isEmpty()) {
    String work = hugeQueue.remove();
    System.out.println("Creating job for " + work);
    Runnable worker = new Worker(work);
    executor.execute(worker);
}

在 Worker 类中:

public Worker(String itemFromQueue) { this.job = itemFromQueue; }

@Override
public void run() {
     System.out.println("Working on " + this.itemFromQueue);
     //Do actual work
}

hugeQueue 包含 10,000 个数字时,我会看到所有 10,000 条“正在创建作业”消息,然后是所有 10,000 条“正在处理”消息。我认为如果一次只创建 5 个作业,然后开始工作,那就更好了——当一个线程打开时,它会创建另一个作业,然后开始工作。这样一来,堆栈上就永远不会有 10,000 个作业。我将如何实现这一目标?我对这个架构的思考是否正确?

<小时/>

编辑以包含基于答案的更新信息:

@seneque 的代码没有直接编译,所以我做了一些小的更改 - 不幸的是,它的输出只是 worker 的创建,而不是实际的工作。

在主函数中:

int numOfThreads = 5;
BlockingQueue<Integer> hugeQueue = new LinkedBlockingQueue<>();
for(int x = 0; x < 1000; x++) { hugeQueue.add(x); }

ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
LongRunningWorker longRunningWorker = new LongRunningWorker();

for( int i = 0; i < numOfThreads ; i++ ) {
    System.out.println("Created worker #" + i);
    executor.submit(longRunningWorker);
}
System.out.println("Done");

在 LongRunningWorker 中:

public class LongRunningWorker implements Runnable {
    BlockingQueue<Integer> workQueue;
    void spiderExmaple(BlockingQueue<Integer> workQueue) {
        this.workQueue = workQueue;
    }

    @Override
    public void run() {
        try {
            while(workQueue.poll(3, TimeUnit.SECONDS) != null) {
                Integer work = workQueue.remove();
                System.out.println("Working on " + work);
                new Worker(work).run();
            }
        } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

在 worker 中:

public class Worker implements Runnable{
    Integer work;
    Worker(Integer x) { this.work = x; }

    @Override
    public void run() {
        System.out.println("Finished work on " + this.work);

    }
}

最佳答案

一种解决方案是让五个线程直接轮询队列。

BlockingQueue<String> hugeQueue = ...
ExecutorService executor = Executors.newFixedThreadPool(5);
LongRunningWorker longRunningWorker = new LongRunningWorker(hugeQueue);
for( int i = 0; i < 5 ; i++ ) {
    executor.submit(longRunningWorker)
}

然后 LongRunningWorker 的定义如下:

class LongRunningWorker(BlockingQueue<String> workQueue) extends Runnable {
    final BlockingQueue<String> workQueue;
    LongRunningWorker(BlockingQueue<String> workQueue) {
        this.workQueue = workQueue;
    }   

    public void run() {
       while((String work =  workQueue.poll(3, TimeUnit.Second) != null) {
           try {
               new Worker(work).run();
           } catch (Exception e) {
               // 
           }
        }
    }
}

关于java - 队列中的多线程作业 - 作业太多?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41505307/

相关文章:

javascript - 自己线程中的 Selenium 异步脚本会阻止其他脚本

java - 在 ScheduledThreadPoolExecutor 中使用带有比较器的 PriorityBlockingQueue

java - 让带有泛型实现的队列打印特定的对象属性

Python多处理,代码继续执行?

java - java.util.hashMap 中的 init 方法

java - FirebaseRecyclerAdapter 28.0

c# - 重新抛出任务中的异常不会使任务进入故障状态

java - 如何在同一个Jslider上添加两个指针?

java - 对 Eclipse 和 Netbeans 使用相同的 java 层次结构

php - Laravel Forge 在同一台服务器上的多个队列