c - AMQP RabbitMQ 消费者互相阻塞?

标签 c rabbitmq amqp librabbitmq

我编写了一个 C (rabbitmq-c) worker 应用程序,它使用由 Python 脚本 (pika) 发布的队列。

我有以下似乎无法解决的奇怪行为:

  1. 在消息发布到队列之前启动所有 worker 按预期工作
  2. 在队列发布后启动 1 个 worker 按预期工作
  3. 但是:在一个工作人员开始从队列中消费后启动额外的工作人员意味着这些工作人员在队列中看不到任何消息(消息计数=0),因此只是等待(即使仍然有很多消息在队列中)。杀死第一个工作人员会突然开始向所有其他(等待的)消费者发送消息。

知道会发生什么吗?

我已经尝试确保每个消费者都有自己的 channel (这有必要吗?)但仍然是相同的行为...

这是消费者( worker )的代码:

conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
           "/",
           0,
           131072,
           0,
           AMQP_SASL_METHOD_PLAIN,
           "guest",
           "guest");

if (amqp_channel_open(conn, chan) == NULL)
    LOG_ERR(" [!] Failed to open amqp channel!\n");

if ((q = amqp_queue_declare(conn,
                            chan,
                            amqp_cstring_bytes("ranges"),
                            0,
                            0,
                            0,
                            0,
                            amqp_empty_table)) == NULL)
    LOG_ERR(" [!] Failed to declare queue!\n");

LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);

amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);

while(1) {
    amqp_maybe_release_buffers(conn);
    amqp_consume_message(conn, &e, NULL, 0);

    {
        int n;
        amqp_frame_t f;
        unsigned char buf[8];
        unsigned char *pbuf = buf;

        amqp_simple_wait_frame(conn, &f);       // METHOD frame
        amqp_simple_wait_frame(conn, &f);       // HEADER frame

        n = f.payload.properties.body_size;
        if (n != sizeof(range_buf))
            LOG_ERR(" [!] Invalid message size!");

        while (n) {
            amqp_simple_wait_frame(conn, &f);   // BODY frame
            memcpy(pbuf,
                   f.payload.body_fragment.bytes,
                   f.payload.body_fragment.len);
            n -= f.payload.body_fragment.len;
            pbuf += f.payload.body_fragment.len;
        }

        // do something with buf

        LOG_INFO(" [x] Message recevied from queue\n");
    }

    amqp_destroy_envelope(&e);

    amqp_maybe_release_buffers(conn);
}

最佳答案

这里的问题很可能是您的消费者在启动时预取了所有消息。这是 RabbitMQ 的默认行为,但您可以减少消费者预取的消息数量,以便更好地将工作负载分配给多个工作人员。

这只是意味着一个或多个消费者将接收所有消息,不为新消费者留下任何消息。

如果您将 qos 应用于您的消费者并将预取限制为 10 条消息。消费者只会排队前 10 条消息,新消费者可以收拾残局。

您正在寻找实现此功能的函数称为 amqp_basic_qos ,此外,您还可以阅读有关消费者预取的更多信息 here .

关于c - AMQP RabbitMQ 消费者互相阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25072527/

相关文章:

开罗图形: how to determine the physical screen dimensions

python - 使用 aioamqp 同时从多个队列消费

rabbitmq - 是否有可能在 amqp 中获取未路由的消息?

c - 将未知大小的 C 数组传递给函数

c - 我自己的 strcpy 函数

c - Fortran 和 c 在源代码如何接口(interface)和编译在一起方面的细微差别

java - Spring RabbitMQ教程导致Connection Refused错误

java - 带有线程池执行器服务的 Vert.x 服务,CPU 使用率高

node.js - amqpassertQueuebork一个连接的含义

node.js - 如何在nodejs上使用RabbitMQ的消息