Perl,如何从 url 并行获取数据?

标签 perl parallel-processing fetch

我需要从许多不公开任何服务的 Web 数据提供者那里获取一些数据,所以我必须写这样的东西,例如使用 WWW::Mechanize:

use WWW::Mechanize;
@urls = ('http://www.first.data.provider.com', 'http://www.second.data.provider.com', 'http://www.third.data.provider.com');
%results = {};
foreach my $url (@urls) {
 $mech = WWW::Mechanize->new();
 $mech->get($url);
 $mech->form_number(1);
 $mech->set_fields('user' => 'myuser', pass => 'mypass');
 $resp = $mech->submit();
 $results{$url} = parse($resp->content());
}
consume(%results);

是否有一些(可能是简单的 ;-)方法可以同时将数据提取到一个公共(public)的 %results 变量,即:并行地从所有提供者那里获取数据?

最佳答案

在 Perl 中应避免使用 threadsuse threads 主要用于
在 Windows 上模拟 UNIX 风格的 fork;除此之外,毫无意义。

(如果你关心的话,这个实现让这个事实非常清楚。在 perl 中,
解释器是一个 PerlInterpreter 对象。 threads 的方式
作品是通过制作一堆线程,然后创建一个全新的
每个线程中的 PerlInterpreter 对象。线程绝对共享
什么都没有,甚至比子进程还少; fork 让你
写时复制,但使用 threads ,所有复制都在 Perl 中完成
空间!减缓!)

如果你想在同一个进程中同时做很多事情,
在 Perl 中这样做的方法是使用事件循环,例如
EV ,
Event , 或
POE ,或使用 Coro。 (你可以
还根据 AnyEvent API 编写您的代码,这将使
您使用任何事件循环。这是我更喜欢的。)区别
两者之间是您编写代码的方式。

AnyEvent(和 EV、事件、
POE 等)迫使您以面向回调的方式编写代码
风格。控制不是从上到下流动的,而是在一个
连续传递风格。函数不返回值,它们调用
其他函数及其结果。这允许您运行许多 IO
并行操作——当给定的 IO 操作产生时
结果,您处理这些结果的函数将被调用。什么时候
另一个 IO 操作完成,该函数将被调用。和
很快。

这种方法的缺点是你必须重写你的
代码。所以有一个名为 Coro 的模块,它为 Perl 提供了真实的
(用户空间)线程,让您从上到下编写代码,
但仍然是非阻塞的。 (这样做的缺点是
大量修改 Perl 的内部结构。但它似乎工作得很好。)

所以,既然我们不想重写
WWW::Mechanize
今晚,我们将使用 Coro。 Coro 带有一个名为的模块
Coro::LWP 这将使
所有对 LWP 的调用都是
非阻塞。它将阻塞当前线程(“协程”,在 Coro
lingo),但它不会阻塞任何其他线程。这意味着你可以
一次性处理大量请求,并在结果变为
可用的。并且 Coro 将比您的网络连接更好地扩展;
每个协程只使用几 k 的内存,所以很容易有几十个
周围成千上万。

考虑到这一点,让我们看一些代码。这是一个启动的程序
三个并行的 HTTP 请求,并打印每个请求的长度
回复。它类似于你在做什么,减去实际
加工;但是你可以把你的代码放在我们计算的地方
长度,它的工作原理是一样的。

我们将从通常的 Perl 脚本样板开始:

#!/usr/bin/env perl

use strict;
use warnings;

然后我们将加载 Coro 特定的模块:
use Coro;
use Coro::LWP;
use EV;

Coro 在幕后使用了一个事件循环;它会为你挑选一个,如果
你想要,但我们只会明确指定 EV。这是最好的事件
环形。

然后我们将加载我们工作所需的模块,这只是:
use WWW::Mechanize;

现在我们准备好编写我们的程序了。首先,我们需要一个 URL 列表:
my @urls = (
    'http://www.google.com/',
    'http://www.jrock.us/',
    'http://stackoverflow.com/',
);

然后我们需要一个函数来生成一个线程并完成我们的工作。做一个
Coro 上的新线程,您可以像 async 一样调用 async { body; of the thread; goes here } 。这将创建一个线程,启动它,然后
继续程序的其余部分。
sub start_thread($) {
    my $url = shift;
    return async {
        say "Starting $url";
        my $mech = WWW::Mechanize->new;
        $mech->get($url);
        printf "Done with $url, %d bytes\n", length $mech->content;
    };
}

所以这是我们程序的主要内容。我们只是把我们普通的 LWP 程序
在异步内部,它将神奇地非阻塞。 get 块,
但是其他协程会在等待它获取数据时运行
来自网络。

现在我们只需要启动线程:
start_thread $_ for @urls;

最后,我们要开始处理事件:
EV::loop;

就是这样。当你运行它时,你会看到一些输出,如:
Starting http://www.google.com/
Starting http://www.jrock.us/
Starting http://stackoverflow.com/
Done with http://www.jrock.us/, 5456 bytes
Done with http://www.google.com/, 9802 bytes
Done with http://stackoverflow.com/, 194555 bytes

如您所见,请求是并行发出的,而您没有
求助于 threads !

更新

您在原始帖子中提到要限制并行运行的 HTTP 请求的数量。一种方法是使用信号量,
Coro::Semaphore 在科罗。

信号量就像一个计数器。当您想使用信号量保护的资源时,您可以“关闭”信号量。这会减少计数器并继续运行您的程序。但是如果当您尝试关闭信号量时计数器为零,则您的线程/协程将进入休眠状态,直到它不为零。当计数再次上升时,您的线程将被唤醒,关闭信号量并继续。最后,当您使用完信号量保护的资源时,您“向上”信号量并给其他线程运行的机会。

这使您可以控制对共享资源的访问,例如“发出 HTTP 请求”。

您需要做的就是创建一个您的 HTTP 请求线程将共享的信号量:
my $sem = Coro::Semaphore->new(5);

5 的意思是“让我们在阻塞之前调用 'down' 5 次”,或者换句话说,“让有 5 个并发 HTTP 请求”。

在我们添加任何代码之前,让我们先谈谈可能出错的地方。可能发生的不好的事情是线程“向下” - 信号量,但永远不会“向上” - 完成后。那么没有任何东西可以使用该资源,并且您的程序可能最终什么也不做。有很多方法可能会发生这种情况。如果你写了一些像 $sem->down; do something; $sem->up 这样的代码,你可能会觉得很安全,但是如果“做某事”抛出异常怎么办?然后信号量将被保留下来,这很糟糕。

幸运的是,Perl 使得拥有作用域 Guard 对象变得容易,当保存对象的变量超出作用域时,它会自动运行代码。我们可以将代码设为 $sem->up ,这样我们就不必担心在我们不打算持有资源的情况下。

Coro::Semaphore 集成了守卫的概念,这意味着你可以说 my $guard = $sem->guard ,当控制流离开你调用 guard 的范围时,它会自动向下和向上传递信号量。

考虑到这一点,为了限制并行请求的数量,我们所要做的就是在使用 HTTP 的协程顶部的 guard 信号量:
async {
    say "Waiting for semaphore";
    my $guard = $sem->guard;
    say "Starting";
    ...;
    return result;
}

处理评论:

如果您不希望您的程序永远存在,则有几种选择。一种是在另一个线程中运行事件循环,然后在每个工作线程上运行 join。这也允许您将结果从线程传递到主程序:
async { EV::loop };

# start all threads
my @running = map { start_thread $_ } @urls;

# wait for each one to return
my @results = map { $_->join } @running;

for my $result (@results) {
    say $result->[0], ': ', $result->[1];
}

您的线程可以返回如下结果:
sub start_thread($) {
    return async {
        ...;
        return [$url, length $mech->content];
    }
}

这是在数据结构中收集所有结果的一种方法。如果您不想返回东西,请记住所有协程共享状态。所以你可以把:
my %results;

在程序的顶部,让每个协程更新结果:
async {
    ...;
    $results{$url} = 'whatever';
};

当所有协程运行完毕后,您的散列将填充结果。不过,您必须对每个协程进行 join 才能知道答案何时准备就绪。

最后,如果您将此作为 Web 服务的一部分执行,您应该使用协程感知 Web 服务器,例如 Corona 。这将在协程中运行每个 HTTP 请求,除了能够并行发送 HTTP 请求之外,还允许您并行处理多个 HTTP 请求。这将很好地利用内存、CPU 和网络资源,并且非常容易维护!

(您基本上可以将我们的程序从上面剪切-粘贴到处理 HTTP 请求的协程中;可以在协程中创建新的协程和 join。)

关于Perl,如何从 url 并行获取数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6814168/

相关文章:

linux - 如何在 Ubuntu 上安装 Net::SFTP 模块?

linux - 脚本在没有执行权限的情况下执行脚本

perl - 如何访问 If/else 范围之外的值

c - 是否可以更快地对 0's 1' 求和?

C++ OpenMP : nested loops where the inner iterator depends on the outer one

带有 foreach 和 fetch 的 PHP PDO

perl ssl tls 与 net::sip 的连接

parallel-processing - 线程清理程序提示意外的内存映射

javascript - React 新手 - 使用 Fetch

javascript - 以 html 形式获取帖子响应,但也包含状态