python - 如何以 “fire and forget” 异步运行函数?

标签 python apache-kafka python-asyncio

我有一个 Python Kafka 消费者应用程序,我在其中消费消息,然后同步调用外部网络服务。 Web 服务需要一分钟时间来处理消息并发送响应。

有没有办法在不等待响应的情况下使用消息、向 Web 服务发送请求并使用下一条消息?


from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'spring_test',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')));

这就是我等待消息和发送外部 Web 请求的方式


def consume_msgs():
 for message in consumer:
    message = message.value;
    send('{}'.format(message))

consume_msgs() 

函数 send() 需要一分钟才能收到响应。我想同时开始异步消费下一条消息,但我不知道从哪里开始

def send(pload) :
 import requests
 r = requests.post('someurl',data = pload)
 print(r)

最佳答案

不确定这是否是您所需要的,但您能否将每次对 send 的调用旋转到一个线程中?像下面这样的东西。这样 for 循环将继续而无需等待 send 返回。如果您消耗数据的速度远快于处理数据的速度,则可能必须以某种方式限制线程数。

from threading import Thread  


def consume_msgs():
 for message in consumer:
    message = message.value;
    Thread(target=send, args = ('{}'.format(message),)).start()

consume_msgs() 

关于python - 如何以 “fire and forget” 异步运行函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64597594/

相关文章:

python - 模型拆分为多个文件的 Flask,如何防止循环导入?

apache-kafka - 使用 loadrunner 对 Kafka 进行性能测试

apache-kafka - java.lang.ClassNotFoundException : kafka. api.OffsetRequest

apache-kafka - kafka __consumer_offsets 主题的分区数过多

python - 在 Python asyncio 中检测套接字 EOF

python - `asyncio.sleep(delay)` 是否保证至少休眠 `delay` 秒?

python - 将字符串的字符与字典python进行比较

python - Tkinter .after 方法卡住窗口?

asynchronous - 阻止获取aio_pika

python - 将 3D 切片绘制为热图