perl - 尽管收获,父进程并不等待子进程完成

标签 perl multiprocessing fork posix waitpid

我完全意识到有大量文章解释了亲子过程动态的内部运作。我已经完成了它们,并且让我的东西几乎按照我想要的方式工作了。但有一件事困扰着我,尽管多次尝试,我还是无法理解。

问题:尽管收获了 child ,main 并没有等待所有 child 完成并提前退出。我相信我确实从子进程中正确退出,并且我已经在子进程中安装了 REAPER - 那么 main 在子进程完成之前如何退出?

不是在这里寻找解决方案 - 但我需要一个新的方向,让我可以在下周开始工作。截至目前 - 我觉得我已经用尽了我的选择并尝试了很多很多事情但都无济于事。

关于我想要实现的目标的一些背景:

总而言之 - 我希望所有的 child 都完成,然后我才想继续做进一步的事情。每个子进程都会生成一堆线程,这些线程由所述子进程正确加入,然后继续使用 exit(0) 退出。

您可能在程序中观察到的额外喧闹只不过是我们的要求,即我们要使用 5 个 API(引擎),但每次只能使用固定的批量大小,例如每次 10 个。我为每个引擎启动子进程,并为每个请求启动线程 - 然后我等待所有线程完成,加入它们,然后子进程才退出。现在我可以将下一批请求存入同一个引擎,并且我对所有引擎执行此操作,直到耗尽我的总请求,例如 10000。

每个请求可能需要 1 秒到 2 小时之间的时间 - 基本上它们是从 HTTP API 获取的 CSV 报告。

我的问题是,当我用尽全部请求集时 - 我无法等待让 MAIN 等待所有子进程完成。这很奇怪,也是我正在努力解决的问题。

有什么想法吗?

我的程序输出:

[compuser@lenovoe470:little-stuff]$  perl 07--20190526-batch-processing-using-threads-with-busy-pool-detection-2.pl 12
26710: STARTING TASKS IN BATCHES
26710: RUNNING batch_engine 1_e1 tasks (1 2)
26710: RUNNING batch_engine 2_e2 tasks (3 4)
26710: RUNNING batch_engine 3_e3 tasks (5 6 7)
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710: BUSY_ENGINE: e3.
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710:26712: TASK_ORCHESTRATOR: >> finished batch_engine (2_e2) tasks (3 4)
26710: PID (26712) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e3.
26710:26713: TASK_ORCHESTRATOR: >> finished batch_engine (3_e3) tasks (5 6 7)
26710:26711: TASK_ORCHESTRATOR: >> finished batch_engine (1_e1) tasks (1 2)
26710: PID (26713) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e1.
26710: PID (26711) has finished with status (0). updating proc hash
26710: RUNNING batch_engine 4_e2 tasks (8 9)
26710: RUNNING batch_engine 5_e3 tasks (10 11 12)
26710: FINISHED TASKS IN BATCHES
[compuser@lenovoe470:little-stuff]$  1:26722: TASK_ORCHESTRATOR: >> finished batch_engine (5_e3) tasks (10 11 12)
1:26721: TASK_ORCHESTRATOR: >> finished batch_engine (4_e2) tasks (8 9)

在上面的输出中:

  • 运行batch_engine意味着我正在运行一批编号的任务。
  • BUSY_ENGINE 表示端点/引擎正忙,因为它已经忙于处理请求的最大批量大小。我需要等待。
  • finishedbatch_engine 表示子进程已完成对特定引擎/端点的给定批处理请求的处理。它退出并且 main 检测到当前引擎现在空闲并且下一批可以入队
  • 如果我们看到最后两行,很明显子进程的输出已经溢出并且 main 提前退出而没有等待正在运行的子进程。为什么?有什么帮助吗?

我的程序:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use Thread qw(async);


STDOUT->autoflush(1);


# doesn't work
  sub reaper {
    my $reaped;
    while (($reaped = waitpid (-1,&WNOHANG) > 0)) {
      print "$$: reaped: $reaped\n";
      sleep(1);
    }
    $SIG{CHLD} = \&reaper;
  }
# doesn't work


my @total_tasks = (1 .. shift || 9);
my @engines = (qw/e1 e2 e3/);
my $sizes = { e1 => 2, e2 => 2, e3 => 3, };

my $proc_hash;
my $global_string = "ENGINE";

# source: https://duyanghao.github.io/ways_avoid_zombie_process/
#
  sub REAPER {
    local ($!, $?);
    while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
      if ( WIFEXITED($?) ) 
      {
        # my
        my $ret_code = WEXITSTATUS($?);
        print "$$: PID ($reaped_pid) has finished with status ($ret_code). updating proc hash\n";
        my $engine_name = $proc_hash->{$reaped_pid};
        delete ($proc_hash->{$reaped_pid});
        delete ($proc_hash->{$engine_name});
        # my

        # original
        #my $ret_code = WEXITSTATUS($?);
        #print "child process:$pid exit with code:$ret_code\n";
        # original
      }
    }
  }
#

$SIG{CHLD} = \&REAPER;

sub random_sleep_time {
  return (int(rand(5)+1))
  #return (sprintf "%.2f",(rand(1)+1))
}

sub task_runner {
  my @args = @_;
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  STDOUT->autoflush(1);
  my $task_time = random_sleep_time();
  sleep ($task_time);
  threads->exit(0);
  #print "$$:".(threads->tid()).": TASK_RUNNER: $global_string ($batch_engine) task ($task) finished in $task_time seconds\n";
  #return;
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my $engine = (split (/_/,$batch_engine))[1];
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    STDOUT->autoflush(1);
    my @tids;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) { 
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }
    my $ppid = getppid();
    foreach my $tid (@tids) {$tid->join()}
    print "$ppid:$$: TASK_ORCHESTRATOR: >> finished batch_engine ($batch_engine) tasks (@tasks)\n";
    exit (0);
  }
}

sub update_proc_hash {
  my $finished_pid = waitpid (-1, POSIX->WNOHANG);
  if ($finished_pid > 0) {
    print "$$: PID ($finished_pid) has finished. updating proc hash\n";
    my $engine_name = $proc_hash->{$finished_pid};
    delete ($proc_hash->{$finished_pid});
    delete ($proc_hash->{$engine_name});
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks) {
  foreach my $engine (@engines) {
    update_proc_hash();
    if (exists $proc_hash->{$engine}) {
      print "$$: BUSY_ENGINE: $engine.\n";
      sleep (1);
      next;
    }
    else {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0) {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks) {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: RUNNING batch_engine $batch_engine tasks (@engine_tasks)\n";
        task_orchestrator ("$batch_engine",@engine_tasks);
        $batch++;
      }
    }
  }
}

REAPER();

print "$$: FINISHED TASKS IN BATCHES\n";

__END__

3天后更新:谢谢社区。我再次感谢所有花时间调查此问题并帮助发现和纠正问题的人。非常感谢。

让我把新的输出和最终程序分享给大家引用。

使用修复后的输出:

User@Host:/cygdrive/c/bash-home> perl test.pl
22044: STARTING TASKS IN BATCHES
22044: MAIN: engine (e1) is RUNNING batch #1 tasks: (1 2)
22044: MAIN: engine (e2) is RUNNING batch #2 tasks: (3 4 5)
22044: MAIN: engine (e3) is RUNNING batch #3 tasks: (6 7)
41456: TASK_RUNNER: engine (e1) finished batch #1 task #1 in (1.80) seconds
41456: TASK_RUNNER: engine (e1) finished batch #1 task #2 in (1.31) seconds
41456: TASK_ORCHESTRATOR: engine (e1) finished batch #1 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (41456) has finished with status (0).
18252: TASK_RUNNER: engine (e2) finished batch #2 task #3 in (1.04) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #4 in (1.91) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #5 in (1.63) seconds
18252: TASK_ORCHESTRATOR: engine (e2) finished batch #2 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (18252) has finished with status (0).
14544: TASK_RUNNER: engine (e3) finished batch #3 task #6 in (1.42) seconds
14544: TASK_RUNNER: engine (e3) finished batch #3 task #7 in (1.84) seconds
14544: TASK_ORCHESTRATOR: engine (e3) finished batch #3 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (14544) has finished with status (0).
22044: MAIN: engine (e1) is RUNNING batch #4 tasks: (8 9)
22044: MAIN: engine (e2) is RUNNING batch #5 tasks: (10)
37612: TASK_RUNNER: engine (e1) finished batch #4 task #8 in (1.19) seconds
37612: TASK_RUNNER: engine (e1) finished batch #4 task #9 in (1.31) seconds
37612: TASK_ORCHESTRATOR: engine (e1) finished batch #4 tasks in (1.00) seconds.
16300: TASK_RUNNER: engine (e2) finished batch #5 task #10 in (1.53) seconds
16300: TASK_ORCHESTRATOR: engine (e2) finished batch #5 tasks in (1.00) seconds.
22044: ALL ORCHESTRATORS HAVE FINISHED
22044: FINISHED TASKS IN BATCHES

最终工作计划:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use threads;

STDOUT->autoflush(1);

my @total_tasks = (1 .. 10);
my $sleep_time = 1;
my @engines = (qw/e1 e2 e3/);
my $sizes = {
  e1 => 2,
  e2 => 3,
  e3 => 2,
};

my $proc_hash;
my $global_string = "engine";

sub REAPER {
  local ($!, $?);
  while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
    if ( WIFEXITED($?) ) {
      my $ret_code = WEXITSTATUS($?);
      print "$$: REAPER: TASK_ORCHESTRATOR pid ($reaped_pid) has finished with status ($ret_code).\n";
      my $engine_name = $proc_hash->{$reaped_pid};
      delete ($proc_hash->{$reaped_pid});
      delete ($proc_hash->{$engine_name});
    }
  }
}

$SIG{CHLD} = \&REAPER;

sub random_sleep_time { return sprintf ("%.2f",(rand ($sleep_time||5) + 1)) }

sub task_runner {
  STDOUT->autoflush(1);
  my @args = @_;
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_time = random_sleep_time();
  sleep ($task_time);
  print "$$: TASK_RUNNER: $global_string ($engine) finished batch #$batch task #$task in ($task_time) seconds\n";
  threads->exit(0);
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    STDOUT->autoflush(1);
    my @tids;
    my $start_time = time;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) {
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }
    foreach my $tid (@tids) {$tid->join()}
    my $end_time = time;
    my $total_time = sprintf ("%.2f",($end_time - $start_time));
    print "$$: TASK_ORCHESTRATOR: engine ($engine) finished batch #$batch tasks in ($total_time) seconds.\n";
    exit (0);
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks)
{
  foreach my $engine (@engines)
  {
    if (exists $proc_hash->{$engine})
    {
      sleep (1);
      next;
    }
    else
    {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0)
      {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks)
      {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: MAIN: engine ($engine) is RUNNING batch #$batch tasks: (@engine_tasks)\n";
        task_orchestrator ($batch_engine,@engine_tasks);
        $batch++;
      }
    }
  }
}

# All 3 below work properly
#sleep (.2) while ((waitpid(-1, WNOHANG)) >= 0);
#sleep (.2) while ((waitpid(-1, WNOHANG)) != -1);
sleep (.2) while ((waitpid(-1, WNOHANG)) > -1);

print "$$: ALL ORCHESTRATORS HAVE FINISHED\n";
print "$$: FINISHED TASKS IN BATCHES\n";
__END__

最佳答案

waitpid

can return 0 if there are child processes matching PID but none have terminated yet

对于-1,这适用于任何子进程,因此具有多个子进程的代码肯定会遇到来自REAPER中的非阻塞waitpid的零返回;这正是我们在有未终止的子进程的情况下等待的方式。但是你的 while 循环首先会退出这样的零。

解决此问题的一种方法是轮询非负返回

use warnings;
use strict;
use feature 'say';

use POSIX ':sys_wait_h';
use Time::HiRes qw(sleep) ;

for (1..4) { 
    my $pid = fork // die "Can't fork: $!";
    if ($pid == 0) { 
        sleep rand 4;  
        say "\tkid $$ exiting"; 
        exit;
    };  
}; 

while ( (my $kid = waitpid -1, WNOHANG) > -1 ) { 
    say "got $kid" if $kid > 0;
    sleep 0.2;
}

打印

        kid 12687 exiting
got 12687
        kid 12690 exiting
got 12690
        kid 12689 exiting
got 12689
        kid 12688 exiting
got 12688

Please adjust the polling period as suitable. Note that since this catches any child processes it is possible for it to interfere with yet other forks, if there were any unwaited ones by that point.

Or you can block with the wait

while ( (my $kid = waitpid -1, 0) > -1 ) { 
    say "got $kid";
}

您现在还可以执行> 0,因为由于调用阻塞,这里不会有0返回。虽然我们只需要在 -1 返回时终止循环(不再有进程),就像以前一样。

主要区别在于该 block 仅在子进程实际退出后执行,因此如果您需要密切关注某些长时间运行的子进程正在执行的操作(并且可能限制其运行时间或防止挂起作业),那么这种形式并不那么容易;您需要一个非阻塞操作。

请注意,某些详细信息(尤其是与返回相关的详细信息)可能因系统而异。


此方法的简单版本是仅等待这些特定的 PID,这些 PID 在您 fork 时收集

foreach my $pid (@pids) {
    my $gone = waitpid $pid, 0;
    say "Process $gone exited with $?" if $gone > 0;  # -1 if reaped already
}

每个进程都会使用 waitpid 进行阻塞。这样做的问题是,如果一个进程运行的时间比其他进程长得多(或挂起),则该循环将陷入等待状态。而且,一般来说,我们宁愿让子进程在退出时收获,而不是按照它们启动的顺序收获。

关于perl - 尽管收获,父进程并不等待子进程完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56327077/

相关文章:

c - 使用 fork() 确定所有命令行参数的行数

regex - 我的 Perl 正则表达式有什么问题

mysql - 在perl或MySQL中将Unix日期 "Wed Sep 15 14:21:36 2010"转换为unix时间戳以存储在mySQL中

perl - 获取 CPAN 中可用包名称的完整列表的最简单方法是什么?

multithreading - MESI缓存协议(protocol)

c - 在我的 Eclipse 中运行此 fork 代码时出错,并且此代码也存在一些概念混淆

perl - 当键是动态的时在Perl中对哈希排序

python - 如何跳过python多处理中的错误

python - 如何让多处理 python 应用程序干净地退出

C - 在后台启动外部程序并获取pid