perl - 如何 fork 和读取多个子进程?

标签 perl fork

我的主要目标是为大量对象做一些(外部依赖时间昂贵的)工作。为此,如果我直接去做,会花费很多时间。所以我决定进入并行模式并 fork 一些(4-8,让我们看看)子进程,每个子进程为较小的对象集完成工作。在主(父)进程中,我想打印出与单进程版本相同的总体进度统计信息。

但是,当我 fork 4 个子进程并在其中做一些工作时,我可以看到它们还活着,但实际上只有一个在做某事并将信息发送回父进程。

这是我到目前为止完成的代码 - 耗时的部分是用随机 usleep 模拟的,它很好地模拟了它的行为。

#!/usr/bin/env perl
use strict;
use warnings;

use DateTime;
use DateTime::Format::HTTP;
use Time::HiRes;

my @to_be_processed = (1..300000);
my @queues;
my $nprocs = 4;

my $parent_from_child;
my @child_from_parent;
my @child_to_parent;

$SIG{CHLD} = 'IGNORE';
$|=1; # autoflush

my %stat = (
    total           => scalar(@to_be_processed),
    processed       => 0,
    time_started    => [Time::HiRes::gettimeofday],
);

# divide the list into queues for each subprocess
for (my $i = 0; $i < $stat{total}; $i++ ) {
    my $queue = $i % $nprocs;
    push @{$queues[$queue]}, $to_be_processed[$i];
}

# for progress simulation
srand (time ^ $$);

for (my $proc = 0; $proc < $nprocs; $proc++) {

    # set up the pipes
    pipe $parent_from_child, $child_to_parent[$proc]        or die "pipe failed - $!";

    # fork
    defined(my $pid = fork) or die "fork failed - $!";

    if ($pid) {
        # parent
        close $child_to_parent[$proc];
        printf("[%u] parent says: child %u created with pid %u\n", $$, $proc, $pid);
    }
    else {
        # child
        close $parent_from_child;
        open(STDOUT, ">&=" . fileno($child_to_parent[$proc]))   or die "open failed - $!";

        warn(sprintf("[%u] child alive with %u entries\n", $$, scalar(@{$queues[$proc]})));

        foreach my $id (@{$queues[$proc]}) {
            printf("START: %s\n", $id);

            # simulation of progress
            my $random_microseconds = int(rand(3000000))+200000;
            warn(sprintf("[%u] child 'works' for %u microseconds", $$, $random_microseconds));
            Time::HiRes::usleep( $random_microseconds );

            printf("DONE\n")
        }
        exit(0);
    }
}

# parent: receive data from children and print overall statistics
while (<$parent_from_child>) {
    chomp(my $line = $_);

    if ($line =~ m/^START: (\S+)/) {
        my ($id) = @_;

        printf("%6u/%6u", $stat{processed}, $stat{total});
        if ($stat{time_avg}) {
            my $remaining = ($stat{total} - $stat{processed}) * $stat{time_avg};
            my $eta = DateTime->from_epoch( epoch => time + $remaining );
            $eta->set_time_zone('Europe/Berlin');
            printf(" (ETA %s)", DateTime::Format::HTTP->format_isoz($eta));
        }
        printf("\r");
    }
    elsif ($line =~ /^DONE/) {
        $stat{processed}++;
        $stat{time_processed} = Time::HiRes::tv_interval( $stat{time_started} );
        $stat{time_avg}       = $stat{time_processed} / $stat{processed};
    }
    else {
        printf("%s\n", $line);
    }
}

通常应该消除警告。 如果你运行它,你应该看到只有一个 child 工作。 我的问题是:为什么?我的错误在哪里?我怎样才能让所有人都完成这项工作?

谢谢 K.

最佳答案

你可以在strace下运行perl,你会发现你的 child 的生命相当短暂,看起来像这样:

close(3)                                = 0
ioctl(4, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff753b3a10) = -1 EINVAL (Invalid argument)
lseek(4, 0, SEEK_CUR)                   = -1 ESPIPE (Illegal seek)
fstat(4, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
dup2(4, 1)                              = 1
dup(4)                                  = 3
fcntl(4, F_GETFD)                       = 0x1 (flags FD_CLOEXEC)
dup2(3, 4)                              = 4
fcntl(4, F_SETFD, FD_CLOEXEC)           = 0
close(3)                                = 0
fcntl(1, F_SETFD, 0)                    = 0
write(2, "[30629] child alive with 75000 e"..., 39) = 39
brk(0x3582000)                          = 0x3582000
write(1, "START: 1\n", 9)               = -1 EPIPE (Broken pipe)
--- SIGPIPE (Broken pipe) @ 0 (0) ---

这就是为什么:

pipe $parent_from_child, $child_to_parent[$proc]        or die "pipe failed - $!";

您在管道的错误参数上使用了数组。您需要在父级中保持所有读取 端打开。相反,您设置了一个数组,以便父级可以保持所有写入端打开(但随后在您的父级 block 中,您立即关闭写入端)。因此,在下一次循环中,pipe 创建一个新句柄,并将其分配给 $parent_from_child。旧值因此不再有引用,perl 将它清理干净——意思是,它关闭文件句柄。所以你的 child 除了最后一个都死于SIGPIPE。

我认为您的印象是您可以重复使用该读取句柄并为其分配多个写入句柄。你不能。 pipe 总是生成一个新的读句柄和一个新的写句柄。

如果你真的想共享同一个读句柄(你可能不想,当两个客户端的输出交错时这会导致损坏),只需在循环之外创建一次。所有的 child 都将通过 fork 继承相同的写句柄。更有可能的是,您希望每个 child 一个,并且您必须使用 select 循环来查看哪些有可用的输出,然后读取它们。

或者,我相信 CPAN 有适合您的现成解决方案(或十个)。

关于perl - 如何 fork 和读取多个子进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8456694/

相关文章:

linux - 我如何在 CentOS 5 上安装 Net::Arping?

c - 在 C 中使用管道在进程之间发送浮点值

c - 函数 fork()

c++ - 使用 C++ 排队多个 system() 命令

c++ - 当子进程不刷新其标准输出时如何从子进程读取标准输出?

perl - Starman 的最佳 --max-requests 设置是什么?

mysql - 为什么 Encode::decode ('UTF-8' , $var) 一切都已经在 UTF-8 中时仍然需要?

c - 将fork()的子级设置为前台,然后退出父级

perl - C :/Strawberry/perl/lib/Carp. pm 第 324 行的格式错误的 UTF-8 字符(致命)

php - 我可以在 Perl 或 PHP 中生成 SHA1 吗?