perl - AnyEvent::RabbitMQ 关闭 channel 的问题

标签 perl rabbitmq message-queue anyevent

我正在编写一个用于将消息发布到消息队列(RabbitMQ)的主程序。该程序是用 Perl 5 编写的,使用 AnyEvent::RabbitMQ用于与 RabbitMQ 的通信。

以下最小示例(对于我遇到的问题)将在通过同一 channel 发送第二个命令时失败,并出现错误“ channel 关闭”。

use strictures 2;

use AnyEvent::RabbitMQ;

main();

############################################################################
sub main {
  _log( debug => 'main' );
  my $condvar = AnyEvent->condvar;
  my $ar      = AnyEvent::RabbitMQ->new;
  $ar->load_xml_spec;
  _log( debug => 'Connecting to RabbitMQ...' );
  $ar->connect(
    host            => 'localhost',
    port            => 5672,
    user            => 'guest',
    pass            => 'guest',
    vhost           => '/',
    timeout         => 1,
    tls             => 0,
    on_success      => sub { _on_connect_success( $condvar, $ar, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure', @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return', @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close', @_ ) },
  );
  $condvar->recv;
  $ar->close;
  return;
}

############################################################################
sub _on_connect_success {
  my ( $condvar, $ar, $new_ar ) = @_;
  _log( debug => 'Connected to RabbitMQ.' );
  _open_channel( $condvar, $new_ar );
  return;
}

############################################################################
sub _open_channel {
  my ( $condvar, $ar ) = @_;
  _log( debug => 'Opening RabbitMQ channel...' );
  $ar->open_channel(
    on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_open_channel_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  _declare_queue( $condvar, $ar, $channel );
  return;
}

############################################################################
sub _declare_queue {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ queue...' );
  $channel->declare_queue(
    queue       => 'test',
    auto_delete => 1,
    passive     => 0,
    durable     => 0,
    exclusive   => 0,
    no_ack      => 1,
    ticket      => 0,
    on_success =>
      sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_declare_queue_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ queue.' );
  _bind_queue( $condvar, $ar, $channel );
  return;
}

############################################################################
sub _bind_queue {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ queue...' );
  $channel->bind_queue(
    queue       => 'test',
    exchange    => '',
    routing_key => '',
    on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_bind_queue_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binded RabbitMQ queue.' );
  _log( info  => 'Master ready to publish messages.' );
  _publish_message( $condvar, $ar, $channel, 'Hello, world!' );
  return;
}

############################################################################
sub _publish_message {
  my ( $condvar, $ar, $channel, $message ) = @_;
  _log( debug => "Publishing RabbitMQ message ($message)..." );
  $channel->publish(
    queue       => 'test',
    exchange    => '',
    routing_key => '',
    body        => $message,
    header      => {},
    mandatory   => 0,
    immediate   => 0,
    on_success =>
      sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
    on_ack          => sub { _error( $condvar, $ar, 'ack',          @_ ) },
    on_nack         => sub { _error( $condvar, $ar, 'nack',         @_ ) },
  );
  return;
}

############################################################################
sub _on_publish_message_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => "Published RabbitMQ message." );
  sleep 1;
  _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
  return;
}

############################################################################
sub _error {
  my ( $condvar, $ar, $type, @error ) = @_;
  _log( error => sprintf '%s - %s', $type, join ', ', @error );
  $condvar->send( $condvar, $ar, $type, @error );
  return;
}

############################################################################
sub _log {
  my ( $level, $message ) = @_;
  my @time = gmtime time;
  $time[5] += 1900;
  $time[4] += 1;
  my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
  my @caller0    = caller(0);
  my @caller1    = caller(1);
  my $subroutine = $caller1[3];
  $subroutine =~ s/^$caller0[0]:://;
  print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
  return;
}

该程序应:
  • 连接到 RabbitMQ
  • 打开一个 RabbitMQ channel
  • 声明一个简单队列(名为“test”)
  • 绑定(bind)到该队列(名为“test”)
  • 发布消息(“你好,世界!”)
  • 成功发布消息后稍等片刻,再发布一条消息

  • 这个程序(主程序)不应该消费消息。还有其他程序可以完成这项工作。

    最小示例(见上文)将产生以下输出:
    2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
    2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
    2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
    2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
    2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
    2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
    2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
    2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
    2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
    2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)
    

    为什么AnyEvent::RabbitMQ还是 RabbitMQ 本身关闭了 channel (不是连接还是我错过了什么)?

    最佳答案

    如果您查看 RabbitMQ 服务器日志,您将看到如下内容:

    {amqp_error,access_refused,"operation not permitted on the default exchange",'queue.bind'}



    显然,它不允许您在默认交换机上绑定(bind)队列。所以你需要先声明并绑定(bind)你自己的交易所。
    sub _declare_exchange {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Declaring RabbitMQ exchange...' );
      $channel->declare_exchange(
        exchange        => 'testest',
        type            => 'fanout',
        on_success =>
          sub { _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
      );
      return;
    }
    
    ############################################################################
    sub _on_declare_exchange_success {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Declared RabbitMQ exchange.' );
      _bind_exchange( $condvar, $ar, $channel );
      return;
    }
    
    ############################################################################
    sub _bind_exchange {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Binding RabbitMQ exchange...' );
      $channel->bind_exchange(
        source      => 'testest',
        destination => 'testest',
        routing_key => '',
        on_success => sub { _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
      );
      return;
    }
    

    一旦你设置了这些潜艇,告诉你的程序使用这个自定义交换。
    sub _on_open_channel_success {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Opened RabbitMQ channel.' );
      $channel->confirm;
      _declare_exchange( $condvar, $ar, $channel );
      return;
    }
    
    $channel->confirm当您将消息发送到队列时,有必要让 RabbitMQ 回答并确认。如果你不这样做,成功处理程序将永远不会被调用,因为没有成功响应返回。

    然后在你的_bind_queue您需要将交易所添加到 bind_queue()称呼。
      $channel->bind_queue(
        queue       => 'test',
        exchange    => 'testest', # <-- here
        routing_key => '',
        # ...
      );
    
    _publish_message 中也需要这样做。与 publish()称呼。在那里,您还应该替换 on_ack处理实际处理确认的东西。我认为您打算这样做,但出现复制/粘贴错误1。
    $channel->publish(
      queue       => 'test',
      exchange    => 'testest', # <-- here
      routing_key => '',
      # ...
      on_ack          => sub { 
      _on_publish_message_success( $condvar, $ar, $channel, @_ );
      },
    );
    

    还有一件事是 sleep来电_on_publish_message_success当您使用 AnyEvent 时,这不是一个好主意,因为这将停止整个程序。使用 AE::timer 反而。
    my $t; 
    $t = AE::timer(1,0,sub {
      _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
      undef $t;
    });
    

    这是包含所有更改的完整代码。
    use strictures 2;
    
    use AnyEvent::RabbitMQ;
    
    main();
    
    ############################################################################
    sub main {
      _log( debug => 'main' );
      my $condvar = AnyEvent->condvar;
      my $ar      = AnyEvent::RabbitMQ->new;
      $ar->load_xml_spec;
      _log( debug => 'Connecting to RabbitMQ...' );
      $ar->connect(
        host            => 'localhost',
        port            => 5672,
        user            => 'guest',
        pass            => 'guest',
        vhost           => '/guest',
        timeout         => 1,
        tls             => 0,
        on_success      => sub { _on_connect_success( $condvar, $ar, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure', @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return', @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close', @_ ) },
      );
      $condvar->recv;
      $ar->close;
      return;
    }
    
    ############################################################################
    sub _on_connect_success {
      my ( $condvar, $ar, $new_ar ) = @_;
      _log( debug => 'Connected to RabbitMQ.' );
      _open_channel( $condvar, $new_ar );
      return;
    }
    
    ############################################################################
    sub _open_channel {
      my ( $condvar, $ar ) = @_;
      _log( debug => 'Opening RabbitMQ channel...' );
      $ar->open_channel(
        on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
      );
      return;
    }
    
    ############################################################################
    sub _on_open_channel_success {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Opened RabbitMQ channel.' );
      $channel->confirm;
      _declare_exchange( $condvar, $ar, $channel );
      return;
    }
    
    ############################################################################
    sub _declare_exchange {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Declaring RabbitMQ exchange...' );
      $channel->declare_exchange(
        exchange        => 'testest',
        type            => 'fanout',
        on_success =>
          sub { _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
      );
      return;
    }
    
    ############################################################################
    sub _on_declare_exchange_success {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Declared RabbitMQ exchange.' );
      _bind_exchange( $condvar, $ar, $channel );
      return;
    }
    
    ############################################################################
    sub _bind_exchange {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Binding RabbitMQ exchange...' );
      $channel->bind_exchange(
        source      => 'testest',
        destination => 'testest',
        routing_key => '',
        on_success => sub { _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
      );
      return;
    }
    
    ############################################################################
    sub _on_bind_exchange_success {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Binded RabbitMQ exchange.' );
      _declare_queue( $condvar, $ar, $channel );
      return;
    }
    
    
    ############################################################################
    sub _declare_queue {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Declaring RabbitMQ queue...' );
      $channel->declare_queue(
        queue       => 'test',
        auto_delete => 1,
        passive     => 0,
        durable     => 0,
        exclusive   => 0,
        no_ack      => 1,
        ticket      => 0,
        on_success =>
          sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
      );
      return;
    }
    
    ############################################################################
    sub _on_declare_queue_success {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Declared RabbitMQ queue.' );
      _bind_queue( $condvar, $ar, $channel );
      return;
    }
    
    ############################################################################
    sub _bind_queue {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Binding RabbitMQ queue...' );
      $channel->bind_queue(
        queue       => 'test',
        exchange    => 'testest',
        routing_key => '',
        on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
      );
      return;
    }
    
    ############################################################################
    sub _on_bind_queue_success {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => 'Binded RabbitMQ queue.' );
      _log( info  => 'Master ready to publish messages.' );
      _publish_message( $condvar, $ar, $channel, 'Hello, world!' );
      return;
    }
    
    ############################################################################
    sub _publish_message {
      my ( $condvar, $ar, $channel, $message ) = @_;
      _log( debug => "Publishing RabbitMQ message ($message)..." );
      $channel->publish(
        queue       => 'test',
        exchange    => 'testest',
        routing_key => '',
        body        => $message,
        header      => {},
        mandatory   => 0,
        immediate   => 0,
        on_success =>
          sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
        on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
        on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
        on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
        on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
        on_ack          => sub { 
            _on_publish_message_success( $condvar, $ar, $channel, @_ );
    #        _error( $condvar, $ar, 'ack',          @_ )    
        },
        on_nack         => sub { _error( $condvar, $ar, 'nack',         @_ ) },
      );
      return;
    }
    
    ############################################################################
    sub _on_publish_message_success {
      my ( $condvar, $ar, $channel ) = @_;
      _log( debug => "Published RabbitMQ message." );
      my $t; $t=AE::timer(1,0,sub {
          _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
          undef $t;
      });
      return;
    }
    
    ############################################################################
    sub _error {
      my ( $condvar, $ar, $type, @error ) = @_;
      _log( error => sprintf '%s - %s', $type, join ', ', @error );
      $condvar->send( $condvar, $ar, $type, @error );
      return;
    }
    
    ############################################################################
    sub _log {
      my ( $level, $message ) = @_;
      my @time = gmtime time;
      $time[5] += 1900;
      $time[4] += 1;
      my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
      my @caller0    = caller(0);
      my @caller1    = caller(1);
      my $subroutine = $caller1[3];
      $subroutine =~ s/^$caller0[0]:://;
      print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
      return;
    }
    

    1)在某些地方你需要给你的同事买啤酒:)

    关于perl - AnyEvent::RabbitMQ 关闭 channel 的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31966007/

    相关文章:

    php - RabbitMQ 单一消费者,消费来自多个队列的消息

    python - Ubuntu 上的 Docker 网络主机

    perl - HTTP 响应 : Net/HTTP/Methods. pm 第 542 行中的错误 block 大小

    perl - 如何在具有传递引用的子例程中使用 PDL rcols?

    perl - Net::LDAPS 在 SSL 连接期间抛出未知错误

    ssl - 如何使用 MQ 的 ssl 证书打开 kdb 文件?

    asp.net-web-api - ASP.Net Web API - 扩展大量写操作

    regex - qr() 的输出何时以及为何发生变化?

    java - RabbitMQ RPC 与交换问题

    c# - Azure函数: async method and output parameters