multithreading - 使用IPC::open2传输大文件

标签 multithreading perl

我制作了一个Perl脚本,该脚本包装了另一个工具(overlapFeatures),以便可以即时正确地转换文件格式。我正在处理的文件都是制表符分隔的表,通常为200万行左右。就其本身而言,overlapFeatures可以轻松应对这些问题。

但是我认为我正在通过一次管道输送这么多管线来导致管道锁定。我知道我需要以某种方式对此进行线程化,以便可以同时读取和写入子进程。但是我真的不明白如何在perl(或其他任何程序)中正确使用线程。据我了解,我可以使用threads甚至IPC::run来解决我的问题。

我最终死锁的原始脚本如下所示:

use strict;
use warnings;
use IPC::Open2;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
    or die "Failed with error $!\n";

open (my $infh, '<', $infile) or die "Can't open $infile\n";
while (<$infh>){
    # Do some format conversion...
    chomp
    my @cols = split /\t/;
    # print a modified line to the tool
    print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
}
close ($input);

while (<$output>){
    # format conversion for ouput
    chomp;
    my @cols = split /\t/;
    print join (",",@cols[0,1,2,5,3,8]),"\n";
}
close ($output);

我尝试重写脚本以按照How to filter a lot of data with IPC::Open2?的方式使用线程,如下所示:
use strict;
use warnings;
use IPC::Open2;
use threads;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
    or die "Failed with error $!\n";

my $thread = async {
    print join(",", qw(seqid start end strand read feature name)),"\n";
    for(;;) {
        my $line = <$output>; # should block here and wait for output?
        last if !defined $line; # end of stream reached?
        print STDERR "Got line $line\n";
        # Do some format conversion...
        chomp $line;
        my @cols = split /\t/, $line;
        # print a modified line to the tool
        print join(",",@cols[0,1,2,5,3,8]),"\n";
    }
    close($output)
};

{
    open (my $infh, '<', $infile) or die "Can't open $infile\n";
    while (<$infh>){
        # format conversion for ouput
        chomp;
        my @cols = split /\t/;
        print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
    }
    close ($input);
}

$thread->join();
waitpid ($pid, 0);

但是,脚本仍然以相同的方式卡住,我也被卡住了。在这种情况下,我也无法解决如何使用IPC::run的问题。

我究竟做错了什么?我是否误解了线程?

编辑:花费更多时间调试脚本(并从amon帮助),我发现我能够从$output检索行。但是,该脚本永远不会结束,并且在收到所有输出后似乎会挂起。我认为这是我现在唯一的问题。

最佳答案

这更像是长篇评论。
我在简化版本中尝试了您的代码。我删除了转换代码,使用Unix yes命令作为无限数据源,并将输出打印到/dev/null,因为我们当前对输出不感兴趣,但对程序的工作不感兴趣。作为您的overlapFeatures的替代品,我使用了cat将数据原封不动地传递。

use strict; use warnings; use IPC::Open2; use threads;

my $command = "cat";
my @args = ();

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
  or die "Failed with error $!\n";

my $thread = async {
  print $_ while defined($_ = <$output>);
  close($output)
};

{
  my $c=0;
  open (my $infh, "-|", "yes") or die;
  open my $null, ">/dev/null" or die;
  while (<$infh>){
    $c++;
    print $null $_;
    if ($c >= 1_000_000) {
      print "\n==another million==\n\n";
      $c=0
    }
  }
  close ($input);
}

$thread->join();
waitpid ($pid, 0);
一旦达到一百万行(按字面意思),我就会打印一条状态消息以断言IO仍在工作。
结果
给定脚本在Perl 12.4的Ubuntu Linux上进行了测试,可以完美地工作。因此,可以合理地假设问题不在于IPC代码,而在于数据格式转换,正在包装的程序或数据量(yes输出字符串"1\n",这导致许多行的数据很少) 。(每组约2MB,每行2个字节)
结论
可能是您正在运行其他配置。如果您正在运行* nix,请断言我使用的脚本也适用于您。如果不是,请明确说明此配置,然后尝试运行等效的脚本。
也有可能将包装器分成两个脚本,至少用于测试,因此您将运行类似
$ convert-to | overlapFeatures | convert-from
这会将所有IPC委派给Shell,并将断言转换正在工作并且该体系结构是可实现的。
集思广益的其他不太可能的想法:
(1)何时执行close操作?难道是由于某种奇怪的原因,循环的一端过早退出了吗?在print STDERR "Closing down xx\n"之前的close可能很有趣。
(2)open2async是否成功产生了它们的进程/线程并返回了控制流?偏执的我会在他们之后再放一个print STDERR ...
(3)您是否从脚本中获取了任何数据,或者流过了一段时间后流变干了?
编辑
在所有写入结束均为EOF d之前,管道不会产生close。因此,所有线程都应关闭未使用的所有内容:
my $thread = async {
  close $input;
  print $_ while defined($_ = <$output>);
  close($output)
};
{
  close $output;
  my $c=0;
  open (my $infh, "-|", "yes") or die;
  open my $null, ">/dev/null" or die;
  while (<$infh>){
    $c++;
    print $null $_;
    ...

关于multithreading - 使用IPC::open2传输大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12532402/

相关文章:

java - 如何使用 Java 线程运行 javafx 组件?

c# - 向 DataGrid 添加大量项目时 UI 卡住

perl - 在 Perl 中使用 tk 运行重复任务

perl - 使用 HTTPS 的 Perl LWP 的内存泄漏

perl - url语言前缀的催化剂最佳方式?

android - C++计算套接字传输速度

python - 尝试使用多线程并行化 python 算法并避免 GIL 限制

java - 为什么我的程序不等待线程中的动画?

regex - 如何使用 linux regex 命令将行乘以范围

perl - perl 中的 -T 选项