我制作了一个Perl脚本,该脚本包装了另一个工具(overlapFeatures),以便可以即时正确地转换文件格式。我正在处理的文件都是制表符分隔的表,通常为200万行左右。就其本身而言,overlapFeatures可以轻松应对这些问题。
但是我认为我正在通过一次管道输送这么多管线来导致管道锁定。我知道我需要以某种方式对此进行线程化,以便可以同时读取和写入子进程。但是我真的不明白如何在perl(或其他任何程序)中正确使用线程。据我了解,我可以使用threads
甚至IPC::run
来解决我的问题。
我最终死锁的原始脚本如下所示:
use strict;
use warnings;
use IPC::Open2;
my $infile = shift;
my $featurefile = shift;
my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);
my ($input, $output);
my $pid = open2($output, $input, $command, @args)
or die "Failed with error $!\n";
open (my $infh, '<', $infile) or die "Can't open $infile\n";
while (<$infh>){
# Do some format conversion...
chomp
my @cols = split /\t/;
# print a modified line to the tool
print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
}
close ($input);
while (<$output>){
# format conversion for ouput
chomp;
my @cols = split /\t/;
print join (",",@cols[0,1,2,5,3,8]),"\n";
}
close ($output);
我尝试重写脚本以按照How to filter a lot of data with IPC::Open2?的方式使用线程,如下所示:
use strict;
use warnings;
use IPC::Open2;
use threads;
my $infile = shift;
my $featurefile = shift;
my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);
my ($input, $output);
my $pid = open2($output, $input, $command, @args)
or die "Failed with error $!\n";
my $thread = async {
print join(",", qw(seqid start end strand read feature name)),"\n";
for(;;) {
my $line = <$output>; # should block here and wait for output?
last if !defined $line; # end of stream reached?
print STDERR "Got line $line\n";
# Do some format conversion...
chomp $line;
my @cols = split /\t/, $line;
# print a modified line to the tool
print join(",",@cols[0,1,2,5,3,8]),"\n";
}
close($output)
};
{
open (my $infh, '<', $infile) or die "Can't open $infile\n";
while (<$infh>){
# format conversion for ouput
chomp;
my @cols = split /\t/;
print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
}
close ($input);
}
$thread->join();
waitpid ($pid, 0);
但是,脚本仍然以相同的方式卡住,我也被卡住了。在这种情况下,我也无法解决如何使用
IPC::run
的问题。我究竟做错了什么?我是否误解了线程?
编辑:花费更多时间调试脚本(并从amon帮助),我发现我能够从
$output
检索行。但是,该脚本永远不会结束,并且在收到所有输出后似乎会挂起。我认为这是我现在唯一的问题。
最佳答案
这更像是长篇评论。
我在简化版本中尝试了您的代码。我删除了转换代码,使用Unix yes
命令作为无限数据源,并将输出打印到/dev/null
,因为我们当前对输出不感兴趣,但对程序的工作不感兴趣。作为您的overlapFeatures
的替代品,我使用了cat
将数据原封不动地传递。
use strict; use warnings; use IPC::Open2; use threads;
my $command = "cat";
my @args = ();
my ($input, $output);
my $pid = open2($output, $input, $command, @args)
or die "Failed with error $!\n";
my $thread = async {
print $_ while defined($_ = <$output>);
close($output)
};
{
my $c=0;
open (my $infh, "-|", "yes") or die;
open my $null, ">/dev/null" or die;
while (<$infh>){
$c++;
print $null $_;
if ($c >= 1_000_000) {
print "\n==another million==\n\n";
$c=0
}
}
close ($input);
}
$thread->join();
waitpid ($pid, 0);
一旦达到一百万行(按字面意思),我就会打印一条状态消息以断言IO仍在工作。结果
给定脚本在Perl 12.4的Ubuntu Linux上进行了测试,可以完美地工作。因此,可以合理地假设问题不在于IPC代码,而在于数据格式转换,正在包装的程序或数据量(
yes
输出字符串"1\n"
,这导致许多行的数据很少) 。(每组约2MB,每行2个字节)结论
可能是您正在运行其他配置。如果您正在运行* nix,请断言我使用的脚本也适用于您。如果不是,请明确说明此配置,然后尝试运行等效的脚本。
也有可能将包装器分成两个脚本,至少用于测试,因此您将运行类似
$ convert-to | overlapFeatures | convert-from
这会将所有IPC委派给Shell,并将断言转换正在工作并且该体系结构是可实现的。集思广益的其他不太可能的想法:
(1)何时执行
close
操作?难道是由于某种奇怪的原因,循环的一端过早退出了吗?在print STDERR "Closing down xx\n"
之前的close
可能很有趣。(2)
open2
和async
是否成功产生了它们的进程/线程并返回了控制流?偏执的我会在他们之后再放一个print STDERR
...(3)您是否从脚本中获取了任何数据,或者流过了一段时间后流变干了?
编辑
在所有写入结束均为
EOF
d之前,管道不会产生close
。因此,所有线程都应关闭未使用的所有内容:my $thread = async {
close $input;
print $_ while defined($_ = <$output>);
close($output)
};
和{
close $output;
my $c=0;
open (my $infh, "-|", "yes") or die;
open my $null, ">/dev/null" or die;
while (<$infh>){
$c++;
print $null $_;
...
关于multithreading - 使用IPC::open2传输大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12532402/