假设我们有一个脚本,它打开一个文件,然后逐行读取它并将该行打印到终端。我们有单线程和多线程版本。
问题是两个脚本的结果输出几乎相同,但不完全相同。在多线程版本中,大约有 10 行缺少前 2 个字符。我的意思是,如果真正的行是“Stackoverflow rocks”行,我得到“ackoverflow rocks”。
我认为这与某些竞争条件有关,因为如果我调整参数以创建大量小 worker ,与使用更少和更大的 worker 相比,我会遇到更多错误。
单线程是这样的:
$file = "some/file.txt";
open (INPUT, $file) or die "Error: $!\n";
while ($line = <STDIN>) {
print $line;
}
多线程版本使用线程队列,此实现基于 @ikegami方法:
use threads qw( async );
use Thread::Queue 3.01 qw( );
use constant NUM_WORKERS => 4;
use constant WORK_UNIT_SIZE => 100000;
sub worker {
my ($job) = @_;
for (@$job) {
print $_;
}
}
my $q = Thread::Queue->new();
async { while (defined( my $job = $q->dequeue() )) { worker($job); } }
for 1..NUM_WORKERS;
my $done = 0;
while (!$done) {
my @lines;
while (@lines < WORK_UNIT_SIZE) {
my $line = <>;
if (!defined($line)) {
$done = 1;
last;
}
push @lines, $line;
}
$q->enqueue(\@lines) if @lines;
}
$q->end();
$_->join for threads->list;
最佳答案
我试过你的程序并得到了类似的(错误的)结果。我使用了 threads::shared
中的 lock
而不是 Thread::Semaphore
围绕 print
,因为它比 T::S 更易于使用,即:
use threads;
use threads::shared;
...
my $mtx : shared;
sub worker
{
my ($job) = @_;
for (@$job) {
lock($mtx); # (b)locks
print $_;
# autom. unlocked here
}
}
...
全局变量 $mtx
用作互斥量。它的值无关紧要,甚至 undef
(如此处)也可以。
仅当当前没有其他线程持有该变量的锁时,对 lock
的调用才会阻塞并返回。
当它超出范围时,它会自动解锁(从而使 lock
返回)。在这个发生的例子中
在 for
循环的每一次迭代之后;不需要额外的 {…}
block 。
现在我们已经同步了 print
调用......
但是这也不起作用,因为print
确实缓冲了I/O(好吧,只有O)。所以我强制无缓冲输出:
use threads;
use threads::shared;
...
my $mtx : shared;
$| = 1; # force unbuffered output
sub worker
{
# as above
}
...
然后就成功了。令我惊讶的是,我可以删除 lock
并且它仍然有效。也许是偶然的。请注意,如果没有缓冲,您的脚本将运行得非常慢。
我的结论是:你是suffering from buffering .
关于multithreading - 使用线程读取输入时缺少字符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42498617/