我正在尝试编写一个脚本,该脚本将通过 HTTP 同时下载最多 N
个文件。
我以前用过 AnyEvent::Worker::Pool管理阻塞任务池。我也用过 AnyEvent::HTTP结合AnyEvent->condvar单独管理非阻塞下载。
我认为结合这两种方法应该非常简单,这样 AnyEvent->condvar从 AnyEvent::Worker::Pool 的角度来看,使 AnyEvent::HTTP::http_get 看起来阻塞.
但是,我遇到了一些我不明白的错误,大概是由于 AnyEvent::Worker 的实现细节所致.这是演示该问题的真正精简版脚本:
use EV;
use AnyEvent 5;
use AnyEvent::Worker::Pool;
use AnyEvent::HTTP;
use 5.10.0;
use strict;
my $pool_size = 2;
my $num_jobs = 7;
# Create a pool of $pool_size workers
my $workers = AnyEvent::Worker::Pool->new($pool_size, sub {
my ($job) = @_;
eval {
my $cv = AnyEvent->condvar;
print "worker starting download [$job] ...\n";
http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
my ($data, $headers) = @_;
if ($headers->{Status} =~ /^2/) {
print "download [$job] succeeded.\n";
} else {
print "download [$job] failed.\n";
}
$cv->send; # notification of download complete/exit.
};
$cv->recv; # wait for download to complete/exit before returning to pool
}; if ($@) {
print "worker payload error: $@\n";
}
return 1;
});
# dispatch the full list of downloads
my ($need,$done) = ($num_jobs, 0);
for my $job (0 .. ($need - 1)) {
print "dispatching job $job...\n";
$workers->do($job, sub {
print "worker [$job] payload threw exception: $@\n" if $@;
print "worker [$job] payload completed successfully!\n" unless $@;
EV::unloop if ++$done == $need;
});
}
EV::loop; # wait here for all downloads to complete
print "We're done!\n"; # some useful code to follow here...
Demo输出如下:
user@host:~$ ./test.pl
dispatching job 0...
dispatching job 1...
dispatching job 2...
dispatching job 3...
dispatching job 4...
dispatching job 5...
dispatching job 6...
worker starting download [0] ...
worker starting download [1] ...
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
worker [6] payload threw exception: no worker connection
EV: error in callback (ignoring): no worker connection at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 60
^C
user@host:~$
user@host:~$
user@host:~$ download [1] failed.
unable to write results: Broken pipe at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 139.
...caught at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 145.
为什么 AnyEvent::HTTP ?
在我的真实脚本中,我使用了 AnyEvent::HTTP
的更多功能;特别是,我将 on_body
回调与 Term::StatusBar
相结合,为脚本的最终用户显示进度条;此外,我在 on_body
回调中策略性地“暂停”,以便我保持传输速率等于或小于最终用户预定义的速率。
请随时提出具有这些功能的替代方案(或破解它们的简单方法!)
为什么 AnyEvent::Worker::Pool ?
我已经很熟悉了。欢迎提出替代建议。
为什么 EV ?
速度很快。同样,欢迎提出替代建议。
最佳答案
您不应该为此任务使用 AnyEvent::Worker::Poll。
我会建议您不要使用 EV::loop EV::unloop 等特定于循环的功能。这会使您的代码与其他循环实现不兼容。
你的代码可以这样重写
use strict;
use AnyEvent;
use AnyEvent::HTTP;
my $pool_size = 2;
my $num_jobs = 7;
my $cur_job = 0;
my $cv = AnyEvent->condvar;
$cv->begin();
for (1..($pool_size < $num_jobs ? $pool_size : $num_jobs)) {
$cv->begin();
make_job($cur_job++);
}
$cv->end();
sub make_job {
my $job = shift;
$num_jobs--;
http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
my ($data, $headers) = @_;
if ($headers->{Status} =~ /^2/) {
print "download [$job] succeeded.\n";
} else {
print "download [$job] failed.\n";
}
if ($num_jobs > 0) {
make_job($cur_job++);
}
else {
$cv->end();
}
};
}
$cv->recv();
关于perl - 使用 N 个并发异步 HTTP 客户端下载 M 个文件,其中 M 很大,N 是可配置的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20593363/