我使用 Python 实现了异步拉订阅器。这是基本代码
def receive_messages(project, subscription_name):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
print ("A")
time.sleep(2)
print('Received message: {}'.format(message))
message.ack()
print ("B")
subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
我需要像这样打印
一个,
消息
B
一个
消息
B
(我需要按顺序运行)或通过给定的线程数接收消息。我找不到限制线程数的方法。由于线程较多,我的程序出现段错误。
如何控制没有线程接收消息。
最佳答案
可以使用策略解决问题
from google.cloud import pubsub_v1
from concurrent import futures
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
def callback(message):
print (str(message.data) + " " + str(threading.current_thread()))
message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)
我们可以使用max_workers设置最大线程数。还可以设置流量控制设置。
关于python - 设置 Python Pub/Sub 异步拉取订阅者线程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48009911/