我有一个脚本,它创建一个队列和一些从队列中读取其作业的工作人员。我的问题是现在脚本不会终止并调用 printData(),因为线程处于空闲状态。这是因为我没有将队列设置为 undef。
我尝试了很多不同的方法,但都导致了各种问题。
- 尽管队列中仍有作业,但队列已终止
- 或者,尽管仍有一个线程正在工作并尝试将新工作插入队列,但此时队列中没有作业。
我使用以下代码
# -------------------------
# Main
# -------------------------
my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfParallelJobs;
pullDataFromDbWithDirectory($directory);
#$worker->enqueue((undef) x $maxNumberOfParallelJobs);
$_->join for @threads;
sub pullDataFromDbWithDirectory {
my $_dir = $_[0];
if ($itemCount <= $maxNumberOfItems) {
my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $filesystem '$label' -listdir '$_dir');
foreach my $item (@retval) {
$itemCount++;
(my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g;
my $file = "$_dir/$filename";
push(@data,$file);
if ($item =~ /^Dir/) {
$worker->enqueue($file);
print "Add $file to queue\n" if $debug;
}
}
}
}
sub doOperation () {
my $ithread = threads->tid();
do {
my $folder = $worker->dequeue();
print "Read $folder from queue with thread $ithread\n" if $debug;
pullDataFromDbWithDirectory($folder);
} while ($worker->pending());
push(@IDLE_THREADS,$ithread);
}
编辑:
我找到了一个丑陋的解决方案。也许还有更好的?我将工作人员添加到 IDLE 数组中并休眠,直到所有工作人员都在其中
sleep 0.01 while (scalar @IDLE_THREADS < $maxNumberOfParallelJobs);
$worker->enqueue((undef) x $maxNumberOfParallelJobs);
$_->join for @threads;
最佳答案
您不能使用 ->pending()
而不让线程过早终止。修复:
my $busy: shared = $num_workers;
sub pullDataFromDbWithDirectory {
my $tid = threads->tid();
while (defined( my $folder = $q->dequeue() )) {
{ lock $busy; ++$busy; }
print "Worker thread $tid processing folder $folder.\n" if $debug;
pullDataFromDbWithDirectory($folder);
{ lock $busy; --$busy; }
}
print "Worker thread $tid exiting.\n" if $debug;
}
sleep 0.01 while $q->pending || $busy;
$worker->end();
$_->join for @threads;
但这引入了竞争条件。
- 工作线程将当前队列中的最后一项出列
- 主线程检查待处理(false)
- 主线程检查繁忙线程的数量(无)
- 主线程向工作线程发出结束信号
- 所有其他工作线程退出。
- 使上述项目出列的工作进程将自己标记为忙碌
- 工作线程开始处理最后一个项目,尝试在队列中添加一堆项目,但失败了。
出队加上忙增量需要是原子的,挂起检查加上忙检查需要是原子的。
如果不改变 Thread::Queue 就不可能做到这一点。您不能只是在这两段代码周围加一个锁,因为这会阻止主线程在其中一个线程空闲时检查所有线程是否都空闲。
我们需要将 ->dequeue
拆分为等待组件和出队组件。我们有后者(->dequeue_nb
),所以我们只需要前者。
use Thread::Queue 3.01;
sub T_Q_wait {
my $self = shift;
lock(%$self);
my $queue = $$self{'queue'};
my $count = @_ ? $self->_validate_count(shift) : 1;
# Wait for requisite number of items
cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
cond_signal(%$self) if (@$queue);
return !$$self{'ENDED'};
}
现在我们可以编写解决方案:
my $busy: shared = 0;
sub pullDataFromDbWithDirectory {
my $tid = threads->tid();
WORKER_LOOP:
while (T_Q_wait($q)) {
my $folder;
{
lock $busy;
$folder = $q->dequeue_nb();
next WORKER_LOOP if !defined($folder);
++$busy;
}
print "Worker thread $tid processing folder $folder.\n" if $debug;
pullDataFromDbWithDirectory($folder);
{
lock $busy;
--$busy;
cond_signal($busy) if !$busy;
}
}
}
{
lock $busy;
cond_wait($busy) while $busy;
$q->end();
$_->join() for threads->list();
}
next
是为了防止另一个线程在 wait
和 dequeue_nb
之间抢占工作。
关于multithreading - 使用多线程时我必须在哪里取消定义队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24014663/