python - RabbitMQ - 从同一主题读取的多个实例

标签 python apache-kafka rabbitmq

我有来自不同应用程序的多个生产者向 RabbitMQ 中的主题发送消息。来自不同应用程序的多个消费者阅读这些主题。这个简单的架构作为 PoC 一直在完美运行。 但是现在我有来自这些应用程序的多个实例,我不希望应用程序 X 实例 1 读取与应用程序 X 实例 2 相同的消息。但是应用程序 X 和应用程序 Y(及其所有实例)需要从同一主题变为红色。

我知道如果消费者共享相同的消费者 ID,Karaf 会平衡来自主题的消息的消费。 RabbitMQ 中存在此功能吗?我一直在阅读文档,但没有找到类似的内容。

最佳答案

我相信你需要kafka的消费者组功能。

(对于每条消息,不同的消费者组应该一起消费,但每个消费者组中只有一个消费者可以消费这条消息)

参见 rabbitmq getstarted ,可以结合Topics模式和Work queues模式来实现这个功能。

示例代码

receive.py

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

queue_name = sys.argv[1]
channel.queue_declare(queue=queue_name)

channel.queue_bind(exchange='topic_logs',
                   queue=queue_name,
                   routing_key='my_key')

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=queue_name)

channel.start_consuming()

send.py

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = 'my_key'
message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

如何运行?

需要启动4个消费者来解释这道题:

python receive.py consumer_group1
python receive.py consumer_group1
python receive.py consumer_group2
python receive.py consumer_group2

以上交叉到app1(instance1), app1(instance2), app2(instance1), app2(instance2)

然后,开始send.py:

python send.py

您将看到每个可以获取消息的应用程序的一个实例。 如果您再次发送,则来自两个不同应用的另一个实例可以接收该消息。

关于python - RabbitMQ - 从同一主题读取的多个实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48956267/

相关文章:

java - RabbitMQ 示例代码未在 Java 中编译

python - P12 中 pyOpenSSL 中的 RSA key 参数

python - 是否可以更正我在 R 中对 paste0() 的使用,以便该函数运行得与原始 Python 示例一样快?

scala - 如何改善reactive-kafka(Scala和Akka Streams)的缓慢性能?

mysql - 将包含数据的新表包含到现有 Debezium 连接器中

c# - 无法使用 rabbitMQ 加载 System.Threading.Tasks.Extensions 4.2.0.0

java - 如何消费一条消息?

python - 如何判断 python 变量中的数据类型?

python - 如何使用 python 脚本控制 docker

apache-kafka - 每个主题的 Kafka 保留.bytes 和全局 log.retention.bytes 不起作用