我编写了一个 C (rabbitmq-c) worker 应用程序,它使用由 Python 脚本 (pika) 发布的队列。
我有以下似乎无法解决的奇怪行为:
- 在消息发布到队列之前启动所有 worker 按预期工作
- 在队列发布后启动 1 个 worker 按预期工作
- 但是:在一个工作人员开始从队列中消费后启动额外的工作人员意味着这些工作人员在队列中看不到任何消息(消息计数=0),因此只是等待(即使仍然有很多消息在队列中)。杀死第一个工作人员会突然开始向所有其他(等待的)消费者发送消息。
知道会发生什么吗?
我已经尝试确保每个消费者都有自己的 channel (这有必要吗?)但仍然是相同的行为...
这是消费者( worker )的代码:
conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
"/",
0,
131072,
0,
AMQP_SASL_METHOD_PLAIN,
"guest",
"guest");
if (amqp_channel_open(conn, chan) == NULL)
LOG_ERR(" [!] Failed to open amqp channel!\n");
if ((q = amqp_queue_declare(conn,
chan,
amqp_cstring_bytes("ranges"),
0,
0,
0,
0,
amqp_empty_table)) == NULL)
LOG_ERR(" [!] Failed to declare queue!\n");
LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);
amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while(1) {
amqp_maybe_release_buffers(conn);
amqp_consume_message(conn, &e, NULL, 0);
{
int n;
amqp_frame_t f;
unsigned char buf[8];
unsigned char *pbuf = buf;
amqp_simple_wait_frame(conn, &f); // METHOD frame
amqp_simple_wait_frame(conn, &f); // HEADER frame
n = f.payload.properties.body_size;
if (n != sizeof(range_buf))
LOG_ERR(" [!] Invalid message size!");
while (n) {
amqp_simple_wait_frame(conn, &f); // BODY frame
memcpy(pbuf,
f.payload.body_fragment.bytes,
f.payload.body_fragment.len);
n -= f.payload.body_fragment.len;
pbuf += f.payload.body_fragment.len;
}
// do something with buf
LOG_INFO(" [x] Message recevied from queue\n");
}
amqp_destroy_envelope(&e);
amqp_maybe_release_buffers(conn);
}
最佳答案
这里的问题很可能是您的消费者在启动时预取了所有消息。这是 RabbitMQ 的默认行为,但您可以减少消费者预取的消息数量,以便更好地将工作负载分配给多个工作人员。
这只是意味着一个或多个消费者将接收所有消息,不为新消费者留下任何消息。
如果您将 qos 应用于您的消费者并将预取限制为 10 条消息。消费者只会排队前 10 条消息,新消费者可以收拾残局。
您正在寻找实现此功能的函数称为 amqp_basic_qos ,此外,您还可以阅读有关消费者预取的更多信息 here .
关于c - AMQP RabbitMQ 消费者互相阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25072527/