具有已知池大小但未知工作人员的 Java- FixedThreadPool

标签 java multithreading threadpool executor

所以我想我有点理解固定线程池的工作原理(使用 Java 中内置的 Executor.fixedThreadPool),但据我所知,通常有一定数量的工作需要完成并且你知道有多少到什么时候完成你启动程序。例如

int numWorkers = Integer.parseInt(args[0]);
int threadPoolSize = Integer.parseInt(args[1]);
ExecutorService tpes =
    Executors.newFixedThreadPool(threadPoolSize);
WorkerThread[] workers = new WorkerThread[numWorkers];
for (int i = 0; i < numWorkers; i++) {
    workers[i] = new WorkerThread(i);
    tpes.execute(workers[i]);
}

每个 workerThread 做的事情都非常简单,那部分是任意的。我想知道的是,如果您有一个固定的池大小(比如最大 8 个),但您不知道在运行时之前需要多少工作人员才能完成任务。

具体示例是:如果我的池大小为 8,并且我正在从标准输入读取。在我阅读时,我将输入分成一组大小的 block 。这些 block 中的每一个都被提供给一个线程(连同一些其他信息),以便他们可以压缩它。因此,我不知道需要创建多少个线程,因为我需要继续执行直到到达输入末尾。我还必须以某种方式确保数据保持相同的顺序。如果线程 2 在线程 1 之前完成并且只是提交它的工作,我的数据就会乱序!

在这种情况下线程池会是错误的方法吗?看起来会很棒(因为我一次不能使用超过 8 个线程)。

基本上,我想做这样的事情:

ExecutorService tpes = Executors.newFixedThreadPool(threadPoolSize);
BufferedInputStream inBytes = new BufferedInputStream(System.in);
byte[] buff = new byte[BLOCK_SIZE];
byte[] dict = new byte[DICT_SIZE];
WorkerThread worker;
int bytesRead = 0;

while((bytesRead = inBytes.read(buff)) != -1) {
   System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
   worker = new WorkerThread(buff, dict)   
   tpes.execute(worker);
}

这不是工作代码,我知道,但我只是想说明我想要什么。

我遗漏了一点,但看看 buff 和 dict 的值是如何变化的,我不知道输入有多长。我不认为我实际上不能做这个想法,因为,好 worker 在第一次打电话后就已经存在了!我不能只说 worker = new WorkerThread 很多次,因为它不是已经指向一个现有线程(是的,一个可能已经死了的线程)并且显然在这个实现中如果它确实有效我就不会运行在平行下。但我的观点是,我想继续创建线程直到达到最大池大小,等待线程完成,然后继续创建线程直到达到输入末尾。

我还需要保持秩序井然,这是非常烦人的部分。

最佳答案

您的解决方案完全没问题(唯一的一点是,如果您的 WorkerThread 的工作量非常小,则可能不需要并行)。

有了线程池,提交任务的数量是无关紧要的。池中的线程数可能少于或多于线程池,线程池负责处理。

然而,这很重要:您依赖于 WorkerThread 的某种结果顺序,但在使用并行性时,无法保证此顺序!不管你是否使用线程池,或者你有多少工作线程等等,你的结果总是有可能以任意顺序完成!

为了保持顺序正确,给每个 WorkerThread 其构造函数中的当前项的编号,并让它们在完成后按正确的顺序放置结果:

int noOfWorkItem = 0;
while((bytesRead = inBytes.read(buff)) != -1) {
   System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
   worker = new WorkerThread(buff, dict, noOfWorkItem++)   
   tpes.execute(worker);
}

关于具有已知池大小但未知工作人员的 Java- FixedThreadPool,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13110313/

相关文章:

multithreading - 在 FutureTask 中包装 Callable/Runnable 有什么好处?

java - fixedThreadPool 的线程数是否可以少于分配的线程数?

c# - ASP.Net 中的线程敏捷性是什么意思?

c++ - vtkUnstructuredGrid->GetPoint() 的线程安全只读替代方案

java - Tomcat 不遵守 catalina.properties 中的 maxThreads 配置,activeCount 不超过 200

java - REST API 和多态性

java - Solr 如何在重新启动之间保留索引数据

java - 将 .txt 的内容放入字符串中

java - 如何扩展接口(interface)

c++ - 是否预计 boost::thread_specific_ptr<>::get() 的使用会很慢?有什么解决办法吗?