perl - 使用 N 个并发异步 HTTP 客户端下载 M 个文件,其中 M 很大,N 是可配置的

标签 perl http asynchronous anyevent worker-pool

我正在尝试编写一个脚本,该脚本将通过 HTTP 同时下载最多 N 个文件。

我以前用过 AnyEvent::Worker::Pool管理阻塞任务池。我也用过 AnyEvent::HTTP结合AnyEvent->condvar单独管理非阻塞下载。

我认为结合这两种方法应该非常简单,这样 AnyEvent->condvarAnyEvent::Worker::Pool 的角度来看,使 AnyEvent::HTTP::http_get 看起来阻塞.

但是,我遇到了一些我不明白的错误,大概是由于 AnyEvent::Worker 的实现细节所致.这是演示该问题的真正精简版脚本:

use EV;
use AnyEvent 5;
use AnyEvent::Worker::Pool;
use AnyEvent::HTTP;
use 5.10.0;
use strict;

my $pool_size = 2;
my $num_jobs  = 7;

# Create a pool of $pool_size workers
my $workers = AnyEvent::Worker::Pool->new($pool_size, sub {
  my ($job) = @_;
  eval {
    my $cv = AnyEvent->condvar;
    print "worker starting download [$job] ...\n";
    http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
      my ($data, $headers) = @_;
      if ($headers->{Status} =~ /^2/) { 
        print "download [$job] succeeded.\n"; 
      } else { 
        print "download [$job] failed.\n"; 
      }
      $cv->send; # notification of download complete/exit.
    };

    $cv->recv; # wait for download to complete/exit before returning to pool
  }; if ($@) {
    print "worker payload error: $@\n";
  }
  return 1;
});

# dispatch the full list of downloads
my ($need,$done) = ($num_jobs, 0);
for my $job (0 .. ($need - 1)) {
  print "dispatching job $job...\n";
  $workers->do($job, sub {
    print "worker [$job] payload threw exception: $@\n" if $@;
    print "worker [$job] payload completed successfully!\n" unless $@;
    EV::unloop if ++$done == $need;
  });
}

EV::loop; # wait here for all downloads to complete
print "We're done!\n"; # some useful code to follow here...

Demo输出如下:

user@host:~$ ./test.pl
dispatching job 0...
dispatching job 1...
dispatching job 2...
dispatching job 3...
dispatching job 4...
dispatching job 5...
dispatching job 6...
worker starting download [0] ...
worker starting download [1] ...
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
worker [6] payload threw exception: no worker connection
EV: error in callback (ignoring): no worker connection at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 60

^C
user@host:~$
user@host:~$
user@host:~$ download [1] failed.
unable to write results: Broken pipe at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 139.
  ...caught at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 145.

为什么 AnyEvent::HTTP

在我的真实脚本中,我使用了 AnyEvent::HTTP 的更多功能;特别是,我将 on_body 回调与 Term::StatusBar 相结合,为脚本的最终用户显示进度条;此外,我在 on_body 回调中策略性地“暂停”,以便我保持传输速率等于或小于最终用户预定义的速率。

请随时提出具有这些功能的替代方案(或破解它们的简单方法!)

为什么 AnyEvent::Worker::Pool

我已经很熟悉了。欢迎提出替代建议。

为什么 EV

速度很快。同样,欢迎提出替代建议。

最佳答案

您不应该为此任务使用 AnyEvent::Worker::Poll。
我会建议您不要使用 EV::loop EV::unloop 等特定于循环的功能。这会使您的代码与其他循环实现不兼容。

你的代码可以这样重写

use strict;
use AnyEvent;
use AnyEvent::HTTP;

my $pool_size = 2;
my $num_jobs  = 7;
my $cur_job = 0;

my $cv = AnyEvent->condvar;
$cv->begin();

for (1..($pool_size < $num_jobs ? $pool_size : $num_jobs)) {
    $cv->begin();
    make_job($cur_job++);
}

$cv->end();

sub make_job {
    my $job = shift;
    $num_jobs--;

    http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
        my ($data, $headers) = @_;
        if ($headers->{Status} =~ /^2/) { 
            print "download [$job] succeeded.\n"; 
        } else { 
            print "download [$job] failed.\n"; 
        }

        if ($num_jobs > 0) {
            make_job($cur_job++);
        }
        else {
            $cv->end();
        }
    };
}

$cv->recv();

关于perl - 使用 N 个并发异步 HTTP 客户端下载 M 个文件,其中 M 很大,N 是可配置的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20593363/

相关文章:

javascript - 为什么 setTimeout 延迟 0 仍然在 for 循环中的所有其他同步代码之后运行?

perl - 哪个是更快的 XML 解析器?

php - http.so 已加载但 http_get 未定义

performance - Chrome网络工具中等待和接收之间的差异以及国际速度原因

html - 请求访问的帧的协议(protocol)为 "https",被访问的帧的协议(protocol)为 "file"。协议(protocol)必须匹配

asynchronous - 如何组合 Lwt 过滤器?

ios - 子类化 NSURLConnection 给出错误 : unrecognized selector sent to instance

perl - 如何多线程查看 Perl 中是否存在网页?

perl - 可以修补 File::Find::Rule 以自动处理文件名字符编码/解码吗?

arrays - 如果移动超出数组的最后一个元素,值是多少?