python - 设置 Python Pub/Sub 异步拉取订阅者线程数

标签 python multithreading python-3.x asynchronous google-cloud-pubsub

我使用 Python 实现了异步拉订阅器。这是基本代码

def receive_messages(project, subscription_name):

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print ("A")
        time.sleep(2)
        print('Received message: {}'.format(message))
        message.ack()
        print ("B")

    subscriber.subscribe(subscription_path, callback=callback)

    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

我需要像这样打印

一个,

消息

B

一个

消息

B

(我需要按顺序运行)或通过给定的线程数接收消息。我找不到限制线程数的方法。由于线程较多,我的程序出现段错误

如何控制没有线程接收消息。

最佳答案

可以使用策略解决问题

from google.cloud import pubsub_v1
from concurrent import futures

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)

def callback(message):
        print (str(message.data) + " " + str(threading.current_thread()))
        message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)

我们可以使用ma​​x_workers设置最大线程数。还可以设置流量控制设置。

关于python - 设置 Python Pub/Sub 异步拉取订阅者线程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48009911/

相关文章:

c - 为什么 futex 在唤醒线程时比 mutex 花费更长的时间?

python-3.x - 如何使用 paramiko 或其他方式从 vps ip 获取本地 PC 上的文件?

python - 将一列值的总和除以数据框中所有行的计数

regex - 使用正则表达式或漂亮的汤从 Instagram 抓取某人的网站

python - 使用顺序求解器时,如何在 Python Gekko 中定义 Intermediate 的最大值和另一个值?

Python 迭代切片日期字符串列表的最快方法

python - 读取SAS文件以获取元信息

Python:属性错误:_dep_map

android - 尝试稍等片刻,然后刷新适配器

mysql - 如何处理对 mysql 的多线程请求以获得空闲行?