multithreading - Perl队列和线程异常退出

标签 multithreading perl

我对 Perl 很陌生,尤其是 Perl 线程。 我想要实现:

  1. 有 5 个线程将数据(随机数)排入队列 线程::队列
  2. 有 3 个线程,用于将数据从队列中取出 线程::队列。

我为实现上述任务而编写的完整代码:

#!/usr/bin/perl -w
use strict;
use threads;
use Thread::Queue;


my $queue = new Thread::Queue();
our @Enquing_threads;
our @Dequeuing_threads;

sub buildQueue
{
    my $TotalEntry=1000;
    while($TotalEntry-- >0)
    {
        my $query = rand(10000);
        $queue->enqueue($query);
        print "Enque thread with TID " .threads->tid . " got $query,";
        print "Queue Size: " . $queue->pending . "\n";
    }
}
sub process_Queue 
{
    my $query;
    while ($query = $queue->dequeue) 
    {
        print "Dequeu thread with TID " .threads->tid . " got $query\n";
    }
}
push @Enquing_threads,threads->create(\&buildQueue) for 1..5;
push @Dequeuing_threads,threads->create(\&process_Queue) for 1..3;

我面临的问题:

  • 线程未按预期同时运行。
  • 整个程序异常退出,控制台输出如下:

Perl exited with active threads: 8 running and unjoined

    0 finished and unjoined
    0 running and detached

    Enque thread with TID 5 got 6646.13585023883,Queue Size: 595 
    Enque thread with TID 1 got 3573.84104215917,Queue Size: 595

感谢任何有关代码优化的帮助。

最佳答案

此行为是预期的:当主线程退出时,所有其他线程也会退出。如果您不在乎,可以$thread->detach它们。否则,您必须手动$thread->join它们,我们将这样做。

$thread->join 等待线程完成,并获取返回值(线程可以像子例程一样返回值,尽管上下文(列表/void/标量)必须是在生成时固定)。

我们将分离排队数据的线程:

threads->create(\&buildQueue)->detach for 1..5;

现在对于出队线程,我们将它们放入一个词法变量(为什么使用全局变量?),以便我们稍后可以将它们出队:

my @dequeue_threads = map threads->create(\&process_queue), 1 .. 3;

然后等待他们完成:

$_->join for @dequeue_threads;

我们知道分离的线程将在程序退出之前完成执行,因为出队线程退出的唯一方法是耗尽队列。

除了一个半错误。您会看到,空队列和已完成队列之间存在差异。如果队列只是空的,出队线程将在 $queue->dequeue 上阻塞,直到获得一些输入。传统的解决方案是在定义它们获得的值时出列。我们可以通过在队列中提供与从队列读取的线程数量一样多的 undef 值来打破循环。更现代版本的 Thread::Queue 有一个 end 方法,该方法使所有后续调用的 dequeue 返回 undef .

问题是何时结束队列。我们应该在所有排队线程退出后执行此操作。这意味着,我们应该手动等待它们。叹息。

my @enqueueing = map threads->create(\&enqueue), 1..5;
my @dequeueing = map threads->create(\&dequeue), 1..3;
$_->join for @enqueueing;
$queue->enqueue(undef) for 1..3;
$_->join for @dequeueing;

子出队中:while(define( my $item = $queue->dequeue )) { ... }

使用define测试修复了另一个错误:rand可以返回零,尽管这种情况不太可能发生并且会通过大多数测试。 rand 的约定是,它返回一个介于零和排除某个上限之间的伪随机 float :区间 [0, x) 中的数字。绑定(bind)默认为 1

如果您不想手动加入排队线程,则可以使用信号量来发出完成信号。信号量是一种多线程原语,可以递增和递减,但不能低于零。如果递减操作会使丢弃计数低于零,则调用会阻塞,直到另一个线程提高计数。如果起始计数为1,则可以将其用作阻止资源的标志。

我们还可以从负值1 - $NUM_THREADS开始,让每个线程递增该值,这样只有当所有线程都退出时,它才能再次递减。

use threads;  # make a habit of importing `threads` as the first thing

use strict; use warnings;
use feature 'say';

use Thread::Queue;
use Thread::Semaphore;

use constant {
  NUM_ENQUEUE_THREADS => 5,  # it's good to fix the thread counts early
  NUM_DEQUEUE_THREADS => 3,
};

sub enqueue {
  my ($out_queue, $finished_semaphore) = @_;
  my $tid = threads->tid;

  # iterate over ranges instead of using the while($maxval --> 0) idiom
  for (1 .. 1000) {
    $out_queue->enqueue(my $val = rand 10_000);
    say "Thread $tid enqueued $val";
  }

  $finished_semaphore->up;
  # try a non-blocking decrement. Returns true only for the last thread exiting.
  if ($finished_semaphore->down_nb) {
    $out_queue->end;  # for sufficiently modern versions of Thread::Queue
    # $out_queue->enqueue(undef) for 1 .. NUM_DEQUEUE_THREADS;
  }
}

sub dequeue {
  my ($in_queue) = @_;
  my $tid = threads->tid;
  while(defined( my $item = $in_queue->dequeue )) {
    say "thread $tid dequeued $item";
  }
}

# create the queue and the semaphore
my $queue = Thread::Queue->new;
my $enqueuers_ended_semaphore = Thread::Semaphore->new(1 - NUM_ENQUEUE_THREADS);

# kick off the enqueueing threads -- they handle themself
threads->create(\&enqueue, $queue, $enqueuers_ended_semaphore)->detach for 1..NUM_ENQUEUE_THREADS;

# start and join the dequeuing threads
my @dequeuers = map threads->create(\&dequeue, $queue), 1 .. NUM_DEQUEUE_THREADS;
$_->join for @dequeuers;

如果线程似乎不是并行运行,而是顺序运行,请不要感到惊讶:此任务(排队随机数)非常快,并且不太适合多线程(排队比创建随机数更昂贵)数)。

这是一个示例运行,其中每个入队程序仅创建两个值:

Thread 1 enqueued 6.39390993005694
Thread 1 enqueued 0.337993319585337
Thread 2 enqueued 4.34504733960242
Thread 2 enqueued 2.89158054485114
Thread 3 enqueued 9.4947585773571
Thread 3 enqueued 3.17079715055542
Thread 4 enqueued 8.86408863197179
Thread 5 enqueued 5.13654995317669
Thread 5 enqueued 4.2210886147538
Thread 4 enqueued 6.94064174636395
thread 6 dequeued 6.39390993005694
thread 6 dequeued 0.337993319585337
thread 6 dequeued 4.34504733960242
thread 6 dequeued 2.89158054485114
thread 6 dequeued 9.4947585773571
thread 6 dequeued 3.17079715055542
thread 6 dequeued 8.86408863197179
thread 6 dequeued 5.13654995317669
thread 6 dequeued 4.2210886147538
thread 6 dequeued 6.94064174636395

您可以看到 5 成功地在 4 之前将一些内容排入队列。线程 78 无法使任何内容出队,6 太快了。此外,所有入队程序都在出队程序生成之前完成(对于如此少量的输入)。

关于multithreading - Perl队列和线程异常退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18352106/

相关文章:

perl - parallel::forkmanager .... 如何在退出时传递 2 个变量

linux - 如何与我们的进程并行运行 Valgrind,使其性能不会下降太多?

c++ - pthread - 使用线程访问多个对象

Java内存模型和并发读

linux - 如何在 Fedora 系统中每小时运行一个 Perl 脚本。我有一个 perl 脚本,它将检查 IP 中的端口是否打开

perl - 如何在perl中在同一范围内重新声明变量?

c++ - 为什么包含 rand() 的这段 C++11 代码在多线程中比在单线程中慢?

java - ExecutorService 类型中的方法 invokeAll 不适用于参数错误

mysql - 如何使用MySQL管理服务器端进程

perl - 当月的日期跨越当月末到下月初时,如何重新排列月的日期