multithreading - 使用多线程时我必须在哪里取消定义队列

标签 multithreading perl queue

我有一个脚本,它创建一个队列和一些从队列中读取其作业的工作人员。我的问题是现在脚本不会终止并调用 printData(),因为线程处于空闲状态。这是因为我没有将队列设置为 undef。

我尝试了很多不同的方法,但都导致了各种问题。

  • 尽管队列中仍有作业,但队列已终止
  • 或者,尽管仍有一个线程正在工作并尝试将新工作插入队列,但此时队列中没有作业。

我使用以下代码

# -------------------------
# Main
# -------------------------
my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfParallelJobs;
pullDataFromDbWithDirectory($directory);
#$worker->enqueue((undef) x $maxNumberOfParallelJobs);
$_->join for @threads;

sub pullDataFromDbWithDirectory {
    my $_dir = $_[0];

    if ($itemCount <= $maxNumberOfItems) {
        my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $filesystem  '$label'  -listdir '$_dir');

        foreach my $item (@retval) {
            $itemCount++;
            (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g;
            my $file = "$_dir/$filename";
            push(@data,$file);

            if ($item =~ /^Dir/) {
                $worker->enqueue($file);
                print "Add $file to queue\n" if $debug;
            }
        }
    }
}

sub doOperation () {
    my $ithread = threads->tid();
    do {
       my $folder = $worker->dequeue();
       print "Read $folder from queue with thread $ithread\n" if $debug;
       pullDataFromDbWithDirectory($folder);
   } while ($worker->pending());

   push(@IDLE_THREADS,$ithread);

}

编辑:

我找到了一个丑陋的解决方案。也许还有更好的?我将工作人员添加到 IDLE 数组中并休眠,直到所有工作人员都在其中

sleep 0.01 while (scalar @IDLE_THREADS < $maxNumberOfParallelJobs);
$worker->enqueue((undef) x $maxNumberOfParallelJobs);
$_->join for @threads;

最佳答案

您不能使用 ->pending() 而不让线程过早终止。修复:

my $busy: shared = $num_workers;

sub pullDataFromDbWithDirectory {
    my $tid = threads->tid();
    while (defined( my $folder = $q->dequeue() )) {
        { lock $busy; ++$busy; }
        print "Worker thread $tid processing folder $folder.\n" if $debug;
        pullDataFromDbWithDirectory($folder);
        { lock $busy; --$busy; }
    }

    print "Worker thread $tid exiting.\n" if $debug;
}

sleep 0.01 while $q->pending || $busy;
$worker->end();
$_->join for @threads;

但这引入了竞争条件。

  1. 工作线程将当前队列中的最后一项出列
  2. 主线程检查待处理(false)
  3. 主线程检查繁忙线程的数量(无)
  4. 主线程向工作线程发出结束信号
  5. 所有其他工作线程退出。
  6. 使上述项目出列的工作进程将自己标记为忙碌
  7. 工作线程开始处理最后一个项目,尝试在队列中添加一堆项目,但失败了。

出队加上忙增量需要是原子的,挂起检查加上忙检查需要是原子的。

如果不改变 Thread::Queue 就不可能做到这一点。您不能只是在这两段代码周围加一个锁,因为这会阻止主线程在其中一个线程空闲时检查所有线程是否都空闲。

我们需要将 ->dequeue 拆分为等待组件和出队组件。我们有后者(->dequeue_nb),所以我们只需要前者。

use Thread::Queue 3.01;

sub T_Q_wait {
    my $self = shift;
    lock(%$self);
    my $queue = $$self{'queue'};

    my $count = @_ ? $self->_validate_count(shift) : 1;

    # Wait for requisite number of items
    cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
    cond_signal(%$self) if (@$queue);

    return !$$self{'ENDED'};
}

现在我们可以编写解决方案:

my $busy: shared = 0;

sub pullDataFromDbWithDirectory {
    my $tid = threads->tid();

    WORKER_LOOP:
    while (T_Q_wait($q)) {
        my $folder;

        {
            lock $busy;
            $folder = $q->dequeue_nb();
            next WORKER_LOOP if !defined($folder);
            ++$busy;
        }

        print "Worker thread $tid processing folder $folder.\n" if $debug;
        pullDataFromDbWithDirectory($folder);

        {
            lock $busy;
            --$busy;
            cond_signal($busy) if !$busy;
        }
    }
}

{
    lock $busy;
    cond_wait($busy) while $busy;
    $q->end();
    $_->join() for threads->list();
}

next 是为了防止另一个线程在 waitdequeue_nb 之间抢占工作。

关于multithreading - 使用多线程时我必须在哪里取消定义队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24014663/

相关文章:

perl - 我的应用程序如何在用户不自己安装的情况下使用 CPAN 模块?

子程序返回中对哈希的 perl 赋值变得很奇怪

python - 异步抓取并使用django celery和redis存储结果并存储我的正确方法是什么?

android - Toast 未显示并出错

multithreading - 是否有一个 API 可以让 N 个线程(或 N 个线程上的 N 个闭包)完成?

Linux 每个进程的资源限制——一个深奥的 Red Hat 之谜

perl - 运行 Build.pl 非交互式

C - 使用出队进行回绕时队列的段错误

c++ - 尝试运行队列程序时出现段错误(核心已转储)- C++

android - 使用 Kotlin 协程将 Java 线程处理消息转换为队列