我有一个 Python Kafka 消费者应用程序,我在其中消费消息,然后同步调用外部网络服务。 Web 服务需要一分钟时间来处理消息并发送响应。
有没有办法在不等待响应的情况下使用消息、向 Web 服务发送请求并使用下一条消息?
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'spring_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')));
这就是我等待消息和发送外部 Web 请求的方式
def consume_msgs():
for message in consumer:
message = message.value;
send('{}'.format(message))
consume_msgs()
函数 send()
需要一分钟才能收到响应。我想同时开始异步消费下一条消息,但我不知道从哪里开始
def send(pload) :
import requests
r = requests.post('someurl',data = pload)
print(r)
最佳答案
不确定这是否是您所需要的,但您能否将每次对 send
的调用旋转到一个线程中?像下面这样的东西。这样 for 循环将继续而无需等待 send
返回。如果您消耗数据的速度远快于处理数据的速度,则可能必须以某种方式限制线程数。
from threading import Thread
def consume_msgs():
for message in consumer:
message = message.value;
Thread(target=send, args = ('{}'.format(message),)).start()
consume_msgs()
关于python - 如何以 “fire and forget” 异步运行函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64597594/