perl - 如何使用任何事件进行异步 www-mechanize

标签 perl asynchronous zeromq www-mechanize anyevent

我一直在对该主题进行大量研究,尽管存在一些相关的问题,但我真的很难理解如何使用 AnyEvent 和 www-mechanize 正确进行异步编程。我试图坚持使用 mechanize,因为它有一个干净的界面并且具有我期望做的内置功能:(比如获取网站的所有图像等)。如果没有可靠/好的方法来做我想做的事,那么我将开始研究 AnyEvent::HTTP 但我想在朝那个方向前进之前我会先问一下。

我是 AnyEvent 编程的新手,但之前使用回调完成了大量的 perl 和 javascript/jquery 异步调用。这些对我来说很有意义,但对于我来说,AnyEvent + Mech 并没有点击。

这是我正在处理的从上游队列中提取 URL 的代码。给出 URL,我想要一个说拉入页面上的所有图像,然后异步。抓取所有图像。

所以伪代码看起来像这样:

  • 从队列中获取 url
  • 获取页面
  • 获取所有 img url 链接
  • 对 img url 进行许多异步调用(例如将 imgs 存储在后端)

  • 我读过,我不能(在研究错误之后)阻止 AnyEvent 回调。如何构建我的程序以在不阻塞的情况下执行异步调用?

    AE事件只能在AE感知功能阻塞时处理,所以我使用LWP::Protocol::AnyEvent::http .它将 LWP (Net:HTTP) 的普通 HTTP 后端替换为 AnyEvent::HTTP,这是 AE 感知的。

    worker 的创建方式如下:
    my Worker->new(upstream_job_url => "tcp://127.0.0.1:5555', run_on_create => 1);
    

    异步部分是调用 _proc_msg 的子 _recv_msg。

    根据 ZeroMQ perl binding docs,我已经有一个 AnyEvent 循环监视 ZeroMQ 套接字...

    非常感谢任何帮助!

    代码:
    package Worker;
    
    use 5.12.0;
    
    use Moose;
    use AnyEvent;
    use LWP::Protocol::AnyEvent::http;
    
    use ZMQ::LibZMQ3;
    use ZMQ::Constants qw/ZMQ_PUSH ZMQ_PULL ZMQ_POLLIN ZMQ_FD/;
    
    use JSON;
    use WWW::Mechanize;
    use Carp;
    use Coro;
    
    
    has 'max_children' => (
        is => 'rw',
        isa => 'Int',
        required => 1,
        default => sub { 0 }
    );
    
    has 'upstream_job_url' => (
        is => 'rw',
        isa => 'URI',
        required => 1,
    );
    
    has ['uri','sink_url'] => (
        is => 'rw',
        isa => 'URI',
        required => 0,
    );
    
    has 'run_on_create' => (
        is => 'rw',
        isa => 'Bool',
        required => 1,
        default => sub { 1 }
    );
    
    has '_receiver' => (
        is => 'rw',
        isa => 'ZMQ::LibZMQ3::Socket',
        required => 0
    );
    
    sub BUILD {
        my $self = shift;
        $self->start if $self->run_on_create;
    }
    
    sub start
    {
        my $self = shift;
        $self->_init_zmq();
    
        my $fh = zmq_getsockopt( $self->_receiver, ZMQ_FD );
        my $w; $w = AnyEvent->io( fh => $fh, poll => "r", cb => sub { $self->_recv_msg } );
        AnyEvent->condvar->recv;
    }
    
    sub _init_zmq
    {   
        my $self = shift;
        my $c = zmq_init() or die "zmq_init: $!\n";
        my $recv = zmq_socket($c, ZMQ_PULL) or die "zmq_socket: $!\n";
        if( zmq_connect($recv, $self->upstream_job_url) != 0 ) {
            croak "zmq_connect: $!\n";
        }
        $self->_receiver($recv);
    }
    
    sub _recv_msg
    {
        my $self = shift;
        while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
            my $msg = JSON::from_json($message, {utf8 => 1});
            $self->uri(URI->new($msg->{url}));
            $self->_proc_msg;
        }
    }
    
    sub _proc_msg
    {
        my $self = shift;
        my $c = async { 
            my $ua = WWW::Mechanize->new;
            $ua->protocols_allowed(['http']); 
            print "$$ processing " . $self->uri->as_string . "... ";
            $ua->get($self->uri->as_string);
            if ($ua->success()) {
                say $ua->status . " OK";
            } else { 
                say $ua->status . " NOT OK";
            }
        }; 
        $c->join;
    }
    
    1;
    

    如您所见,我在 _proc_msg 中尝试 Coro,我尝试只进行机械调用,但出现错误
    AnyEvent::CondVar: recursive blocking wait attempted at lib/Worker.pm line 91.
    

    因为 $mech 仍然在回调中阻塞。我不知道如何正确地在我的回调中进行机械调用。

    应 ikegami 的要求,我添加了发送 url 的驱动程序。出于测试目的,我让它只是读取一个 RSS 提要,并将链接发送给工作人员以尝试处理。我对带有回调的任何事件的基本结构感到好奇,但我很高兴能在整个程序上获得帮助。下面是驱动代码:
    #!/usr/local/bin/perl
    
    use strict;
    use warnings;
    use v5.12.0;
    
    use lib './lib';
    
    use Config::General;
    use Getopt::Long;
    use Carp;
    use AnyEvent;
    use AnyEvent::Feed;
    use Parallel::ForkManager;
    use ZMQ::LibZMQ3;
    use ZMQ::Constants qw(ZMQ_PUSH ZMQ_PULL);
    use Worker;
    
    # Debug
    use Data::Dumper;
    $Data::Dumper::Deparse = 1;
    
    my $config_file = "feeds.cfg";
    
    GetOptions(
        "--config|c" => \$config_file,
        "--help|h" => sub { usage(); exit(0); }
    );
    
    sub usage() 
    {
        say "TODO";
    }
    
    $SIG{INT} = sub { croak; }; $SIG{TERM} = sub { croak; };
    $SIG{CHLD} = 'IGNORE';
    
    my $conf = Config::General->new($config_file) or croak "Couldn't open config file '$config_file' $!\n";
    
    my %config = $conf->getall();
    my @readers = ();
    my @feeds = load_feeds(\%config);
    
    my $mgr = Parallel::ForkManager->new( $config{'max_download_children'} ) or croak "Can't create fork manager: $!\n";
    my $context = zmq_init() or croak "zmq_init: $!\n";
    my $sender = zmq_socket($context, ZMQ_PUSH) or die "zmq_socket: $!\n";
    
    foreach my $feed_cfg (@feeds) {
        my $reader = AnyEvent::Feed->new(url => delete $feed_cfg->{url}, %$feed_cfg);
        push(@readers, $reader); # save, don't go out of scope
    }
    
    # Fork Downloader children. These processes will look for incoming data
    # in the img_queue and download the images, storing them in nosql
    for ( 1 .. $config{'max_download_children'} ) {
        my $pid = $mgr->start; 
        if (!$pid) {
            # Child
            my $worker = Worker->new({
                upstream_job_url => URI->new('tcp://127.0.0.1:5555')
            });
            $mgr->finish;
            say "$$ exiting.";
            exit(0);
        } else {
            # Parent
            say "[forked child $pid] my pid is $$";
        }
    }
    
    if (zmq_bind($sender, 'tcp://127.0.0.1:5555') < 0) {
        croak "zmq_bind: $!\n";
    }
    
    # Event loop 
    AnyEvent->condvar->recv;
    
    sub load_feeds
    {
        my $conf = shift;
        my @feeds = ();
        foreach my $feed ( keys %{$conf->{'feeds'}} ) {
            my $feed_ref = $conf->{'feeds'};
            $feed_ref->{$feed}->{'name'} = $feed;
            $feed_ref->{$feed}->{'on_fetch'} = \&fetch_feed_cb;
            push(@feeds, $feed_ref->{$feed});   
        }
        return @feeds;
    }
    
    sub fetch_feed_cb
    {
        my ($feed_reader, $new_entries, $feed, $error) = @_;
        if (defined $error) {
            say "Error fetching feed: $error";
            return;
        }
        say "$$ checking for new feeds";
        for (@$new_entries) {
            my ($hash, $entry) = @$_;
            say "$$ sending " . $entry->link;
            zmq_send($sender, JSON::to_json( { url => $entry->link }, { pretty => 1, utf8 => 1 } ));
        }
    }
    

    这是一个示例运行:
    [forked child 40790] my pid is 40789
    [forked child 40791] my pid is 40789
    [forked child 40792] my pid is 40789
    40789 checking for new feeds
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/mWprjBD3UhM/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/NngMs9pCQew/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/wiUsvafLGFU/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/QMp6gnZpFcA/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/kqUb_rpU5dE/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/tHItKqKhGXg/
    40789 sending http://feedproxy.google.com/~r/PerlNews/~3/7LleQbVnPmE/
    FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
    FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
    FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
    40791 processing http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/... 
    40790 processing http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/... 
    40792 processing http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/... ^C at /usr/local/perls/perl5162/lib/perl5/site_perl/darwin-thread-multi-2level/AnyEvent/Loop.pm line 231.
    

    如果我没有明确地做一个'使用 Coro;'在 Worker.pm 中,不会显示 coro FATAL 错误。如果没有进一步的运行时错误,我不知道异步是如何工作的。

    示例配置文件(feeds.cfg):
    max_download_children = 3
    <feeds>
        <feed1>
            url="http://feeds.feedburner.com/PerlNews?format=xml"   
            interval=60
        </feed1>
    </feeds>
    

    所以我今天花了更多时间来处理这个问题。所以我做 $c->join 的方式的错误。我不应该这样做,因为我无法阻止回调。 Coro 将安排异步 block ,并在完成后完成。我唯一需要确保做的就是以某种方式知道所有异步何时完成,我想我可以弄清楚。现在棘手的部分是试图弄清楚这个小谜团:
    sub _recv_msg
    {
        my $self = shift;
        while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
            my $msg = JSON::from_json($message, {utf8 => 1});
            $self->uri(URI->new($msg->{url}));
            $self->_proc_msg;
        }
    }
    

    这个 while 循环导致我在 _proc_msg 中的 async { } 线程不运行。删除 while 循环,只处理第一个 msg 和 coros 运行。将 while 循环留在原处,它们将永远不会运行。对我来说很奇怪,还没有弄清楚为什么。

    进一步更新:

    zmq_msg_recv 被阻塞。此外,父级中的 zmq_send 可以阻止。必须使用 ZMQ_NOBLOCK。
    我将 worker 和 main 完全拆分为单独的程序。

    最佳答案

    你可以使用 https://metacpan.org/pod/AnyEvent::HTTP::LWP::UserAgent用于异步调用。

      use AnyEvent::HTTP::LWP::UserAgent;
      use AnyEvent;
    
      my $ua = AnyEvent::HTTP::LWP::UserAgent->new;
      my @urls = (...);
      my $cv = AE::cv;
      $cv->begin;
      foreach my $url (@urls) {
          $cv->begin;
          $ua->get_async($url)->cb(sub {
              my $r = shift->recv;
              print "url $url, content " . $r->content . "\n";
              $cv->end;
          });
      }
      $cv->end;
      $cv->recv;
    

    关于perl - 如何使用任何事件进行异步 www-mechanize,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19085122/

    相关文章:

    asynchronous - 如何确保所有数据都从 clojure core.async channel 写入文件?

    javascript - 调用数量可变且相互依赖的异步函数

    java - Netty 因多个客户端连接而卡住

    zeromq - 防火墙需要哪些开放端 Eloquent 能允许 salt 栈远程执行?

    c - ZeroMQ - 在接受队列中的新客户端之前向客户端发送和接收多条消息

    python - 在 ZeroRPC 中实现自定义队列

    perl - 在 Perl 模板工具包中显示时间增量

    perl - 为什么我不能从 Perl 连接到 postgres?

    regex - 使用正则表达式过滤匹配模式

    perl - 如何在加载的模块中使用 Smart::Comments 而不更改其源?