我有以下工作队列实现,我用它来限制使用中的线程数。它的工作原理是我最初将一些 Runnable 对象添加到队列中,当我准备好开始时,我运行“begin()”。此时我不再向队列中添加任何内容。
public class WorkQueue {
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedList queue;
Integer runCounter;
boolean hasBegun;
public WorkQueue(int nThreads) {
runCounter = 0;
this.nThreads = nThreads;
queue = new LinkedList();
threads = new PoolWorker[nThreads];
hasBegun = false;
for (int i = 0; i < nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
public boolean isQueueEmpty() {
synchronized (queue) {
if (queue.isEmpty() && runCounter == 0) {
return true;
} else {
return false;
}
}
}
public void begin() {
hasBegun = true;
synchronized (queue) {
queue.notify();
}
}
public void add(Runnable r) {
if (!hasBegun) {
synchronized (queue) {
queue.addLast(r);
runCounter++;
}
} else {
System.out.println("has begun executing. Cannot add more jobs ");
}
}
private class PoolWorker extends Thread {
public void run() {
Runnable r;
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
try {
queue.wait();
} catch (InterruptedException ignored) {
}
}
r = (Runnable) queue.removeFirst();
}
// If we don't catch RuntimeException,
// the pool could leak threads
try {
r.run();
synchronized (runCounter) {
runCounter--;
}
} catch (RuntimeException e) {
// You might want to log something here
}
}
}
}
}
这是我用来跟踪工作队列中所有作业何时完成的可运行对象:
public class QueueWatcher implements Runnable {
private Thread t;
private String threadName;
private WorkQueue wq;
public QueueWatcher(WorkQueue wq) {
this.threadName = "QueueWatcher";
this.wq = wq;
}
@Override
public void run() {
while (true) {
if (wq.isQueueEmpty()) {
java.util.Date date = new java.util.Date();
System.out.println("Finishing and quiting at:" + date.toString());
System.exit(0);
break;
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(PlaneGenerator.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
public void start() {
wq.begin();
System.out.println("Starting " + threadName);
if (t == null) {
t = new Thread(this, threadName);
t.setDaemon(false);
t.start();
}
}
}
这是我使用它们的方式:
Workqueue wq = new WorkQueue(9); //Get same results regardless of 1,2,3,8,9
QueueWatcher qw = new QueueWatcher(wq);
SomeRunnable1 sm1 = new SomeRunnable1();
SomeRunnable2 sm2 = new SomeRunnable2();
SomeRunnable3 sm3 = new SomeRunnable3();
SomeRunnable4 sm4 = new SomeRunnable4();
SomeRunnable5 sm5 = new SomeRunnable5();
wq.add(sm1);
wq.add(sm2);
wq.add(sm3);
wq.add(sm4);
wq.add(sm5);
qw.start();
但是无论我使用多少个线程,结果总是一样的——它总是需要大约 1 分钟 10 秒才能完成。这与我刚刚执行单线程版本时(当所有内容都在 main() 中运行时)大致相同。 如果我将 wq 设置为 (1,2,3--9) 个线程,它总是在 1m8s-1m10s 之间。问题是什么 ?作业(someRunnable)之间没有任何关系,不能互相阻塞。
编辑:每个可运行程序只是从文件系统中读取一些图像文件并在单独的目录中创建新文件。新目录最终包含大约 400 个输出文件。
编辑:似乎只有一个线程总是在工作。我做了以下更改:
我让 Woolworker 存储一个 Id
PoolWorker(int id){
this.threadId = id;
}
在运行之前,我打印了工作人员的 ID。
System.out.println(this.threadId + " got new task");
r.run();
在创建 poolworkers 时,在 WorkQueue 构造函数中我这样做:
for (int i = 0; i < nThreads; i++) {
threads[i] = new PoolWorker(i);
threads[i].start();
}
但似乎只有线程 0 执行任何工作,因为输出始终是:
0 got new task
最佳答案
使用queue.notifyAll()
开始处理。
目前您正在使用 queue.notify()
,它只会唤醒一个线程。 (让我指出这一点的重要线索是当你提到只有一个线程在运行时。)
此外,Integer runCounter
上的同步并没有按照您的想法进行 - runCounter++
实际上是为 Integer
分配一个新值每次,所以你在很多不同的 Integer
对象上进行同步。
附带说明一下,即使对于最好的程序员来说,使用原始线程和wait/notify
范例也是复杂且容易出错的——这就是 Java 引入 java.util.concurrent
package 的原因,它提供线程安全的 BlockingQueue
实现和 Executors
以轻松管理多线程应用程序。
关于java - 为什么多线程版本与单线程版本花费相同的时间?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35237944/