我试图找出所有 ForkJoinPool 线程何时完成了它们的任务。 我编写了这个测试应用程序(我使用了 System.out,因为它只是一个快速测试应用程序,并且没有错误检查/处理):
public class TestForkJoinPoolEnd {
private static final Queue<String> queue = new LinkedList<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
enqueue("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!customThreadPool.isTerminating()) {
String s = dequeue();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
}
static List<String> makeList() {
return Stream.generate(() -> makeString())
.limit(MAX_SIZE)
.collect(Collectors.toList());
}
static String makeString() {
int leftLimit = 97; // letter 'a'
int rightLimit = 122; // letter 'z'
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = leftLimit + (int)
(random.nextFloat() * (rightLimit - leftLimit + 1));
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
int sum = 0;
for (int i = 0; i < s.length(); i++) {
sum += s.charAt(i);
}
return (sum / SPEED_UP);
}
static void process(String s) {
StringBuilder sb = new StringBuilder(s);
long start = System.currentTimeMillis();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.currentTimeMillis();
sb.append(" slept for ")
.append((end - start))
.append(" milliseconds");
enqueue(sb.toString());
}
static void enqueue(String s) {
synchronized (queue) {
queue.offer(s);
}
}
static String dequeue() {
synchronized (queue) {
return queue.poll();
}
}
}
这段代码卡住了,永远不会完成。如果我将 while 循环的条件更改为 !customThreadPool.isQuiescent()
,它会终止循环并将计数器和队列大小设置为 1。
我应该使用什么来确定线程何时完成?
最佳答案
ExecutorService
不会仅仅因为一项作业(及其子作业)完成而自行终止。线程池背后的整体理念是可重用。
因此它只会在应用程序对其调用 shutdown()
时终止。
您可以使用 isQuiescent()
来查明是否没有待处理的作业,这仅在所有提交的作业都属于您的特定任务时才有效。使用 submit
返回的 future 来检查实际作业的完成情况要简洁得多。
无论哪种情况,排队任务的完成状态都不会说明您正在轮询的队列。当您了解提交结束时,您仍然需要检查队列中是否有未决元素。
此外,建议使用线程安全的 BlockingQueue
实现,而不是用 synchronized
block 装饰 LinkedList
。连同其他一些需要清理的东西,代码看起来像这样:
public class TestForkJoinPoolEnd {
private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
ForkJoinTask<?> future = customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
QUEUE.offer("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!future.isDone()) try {
String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);
if (s != null) {
System.out.println(s);
counter--;
}
} catch (InterruptedException e) {}
for(;;) {
String s = QUEUE.poll();
if (s == null) break;
System.out.println(s);
counter--;
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
customThreadPool.shutdown();
}
static List<String> makeList() {
return IntStream.range(0, MAX_SIZE)
.mapToObj(i -> makeString())
.collect(Collectors.toList());
}
static String makeString() {
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
return s.chars().sum() / SPEED_UP;
}
static void process(String s) {
long start = System.nanoTime();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.nanoTime();
QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");
}
}
如果您在接收端的 sleep
调用应该模拟一些工作负载而不是等待新项目,您还可以使用
int counter = MAX_SIZE + 1;
while (!future.isDone()) {
String s = QUEUE.poll();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {}
}
但逻辑没有改变。在 future.isDone()
返回 true
后,我们必须重新检查队列中的未决元素。我们只保证不会有新元素到达,而不能保证队列已经为空。
作为旁注,makeString()
方法可以进一步改进以
static String makeString() {
int targetStringLength = 10;
ThreadLocalRandom random = ThreadLocalRandom.current();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('a', 'z' + 1);
buffer.append((char)randomLimitedInt);
}
return buffer.toString();
}
甚至
static String makeString() {
int targetStringLength = 10;
return ThreadLocalRandom.current()
.ints(targetStringLength, 'a', 'z'+1)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
关于java - 当 ForkJoinPool 结束时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57655847/