这适用于配置单元查询中的自定义 UDTF,CreateLogTable
是我用作测试临时对象的 UDTF 类。我正在为每个 file to be downloaded from Amazon S3
创建一个线程并等待另一个线程变得可用,然后再将另一个文件分配给该线程。
主要测试逻辑:
CreateLogTable CLT = new CreateLogTable();
int numThreads = 2;
int index = 0;
DownloadFileThread[] dlThreads = new DownloadFileThread[numThreads];
for (S3ObjectSummary oSummary : bucketKeys.getObjectSummaries()) {
while (dlThreads[index] != null && dlThreads[index].isAlive()) {
index += 1;
index = index % numThreads;
}
dlThreads[index] = new DownloadFileThread(CLT , getBucket(oSummary.getBucketName() + "/"
+ oSummary.getKey()), getFile(oSummary.getKey()), index);
dlThreads[index].start();
index += 1;
index = index % numThreads;
}
线程类(run()
方法):
try {
System.out.println("Creating thread " + this.threadnum);
this.fileObj = this.S3CLIENT.getObject(new GetObjectRequest(this.filePath, this.fileName));
this.fileIn = new Scanner(new GZIPInputStream(this.fileObj.getObjectContent()));
while (this.fileIn.hasNext()) {
this.parent.forwardToTable(fileIn.nextLine());
}
System.out.println("Finished " + this.threadnum);
} catch (Throwable e) {
System.out.println("Downloading of " + this.fileName + " failed.");
}
创建线程之前的 while 循环应该一直循环,直到找到 null thread
或dead thread
直到它退出循环,在这种情况下 new thread
将被创建并启动。由于我将日志记录到控制台,因此我能够观察此过程,但输出是意外的:
Creating thread 0
Creating thread 1
Creating thread 0
Creating thread 1
Creating thread 0
Creating thread 1
Creating thread 0
...
Creating thread 1
Creating thread 0
Creating thread 1
Finished 0
Finished 1
Finished 1
Finished 0
Finished 1
Finished 1
...
Finished 0
Finished 1
Finished 0
Finished 1
上面只是输出的前几行。问题在于,在任何线程完成其任务之前,会创建两个以上的线程。
为什么会发生这种情况以及如何解决这个问题?
最佳答案
我将您的代码简化为这个测试用例:
public class ThreadTest {
private static class SleepThread extends Thread {
private final int index;
SleepThread(int ii) { index = ii; }
@Override
public void run() {
System.out.println("Creating thread " + this.index);
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished " + this.index);
}
}
public static void main(String[] args) {
int numThreads = 2;
int index = 0;
SleepThread[] dlThreads = new SleepThread[numThreads];
for (int ii = 0; ii < 10; ++ii) {
while (dlThreads[index] != null && dlThreads[index].isAlive()) {
index += 1;
index = index % numThreads;
}
dlThreads[index] = new SleepThread(index);
dlThreads[index].start();
index += 1;
index = index % numThreads;
}
}
}
使用 Sun JDK 1.7.0_75,运行它会产生您期望的结果 - 两个线程启动,它们在五秒后退出,另外两个线程启动,依此类推。
我怀疑的下一件事是,您的 JVM 的 Thread.isAlive()
实现在线程启动后不会立即返回 true,尽管这似乎与文档相反Thread
类。
关于java - 为什么这个程序创建的线程比可能的多?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31278765/