每当输入product_id时,我的代码都会使用API创建订单。然后,它会在定时间隔循环中检查订单是否已成功创建。我绝对没有正确使用 asyncio,希望有人可以提供提示,或者 asyncio 是否是完成这项工作的正确工具?
我在线检查了文档和示例,但发现很难理解,很可能是因为我仍在学习如何编码(使用 Python 3.7.1 的初学者程序员)
import asyncio
import requests
import time
#creates order when a product id is entered
async def create_order(product_id):
url = "https://api.url.com/"
payload = """{\r\n \"any_other_field\": [\"any value\"]\r\n }\r\n}"""
headers = {
'cache-control': "no-cache",
}
response = requests.request("POST", url, data=payload, headers=headers)
result = response.json()
request_id = result['request_id']
result = asyncio.ensure_future(check_orderloop('request_id'))
return result
#loops and check the status of the order.
async def check_orderloop(request_id):
print("checking loop: " + request_id)
result = check_order(request_id)
while result["code"] == "processing request":
while time.time() % 120 == 0:
await asyncio.sleep(120)
result = check_order(request_id)
print('check_orderloop: ' + result["code"])
if result["code"] != "processing request":
if result["code"] == "order_status":
return result
else:
return result
#checks the status of the order with request_id
async def check_order(request_id):
print("checking order id: " + request_id)
url = "https://api.url.com/" + request_id
headers = {
'cache-control': "no-cache"
}
response = requests.request("GET", url, headers=headers)
result = response.json()
return result
asyncio.run(create_order('product_id'))
W/o asyncio,它一次只能创建+检查一个订单。我希望代码能够同时异步地创建+检查许多不同的订单。
当我通过尝试两个不同的产品 ID 进行测试时,它给出了以下内容,但没有完成功能。
<coroutine object create_order at 0x7f657b4d6448>
>create_order('ID1234ABCD')
__main__:1: RuntimeWarning: coroutine 'create_order' was never awaited
<coroutine object create_order at 0x7f657b4d6748>
最佳答案
在异步代码中使用阻塞请求库是不好的。您的程序将等待每个请求完成,并且在此期间不会执行任何操作。您可以使用async aiohttp
library相反。
您不能只调用async
函数并期望它被执行,它只会返回一个coroutine
对象。您必须通过asyncio.run()
将此协程添加到事件循环中,您在代码中是如何做的:
>>> asyncio.run(create_order('ID1234ABCD'))
或者,如果您想同时运行多个协程:
>>> asyncio.run(
asyncio.wait([
create_order('ID1234ABCD'),
create_order('ID1234ABCD')
])
)
此外,如果您想从 REPL 轻松测试 async
函数,您可以使用 IPython ,它可以直接从 REPL 运行异步函数(仅适用于 7.0 以上的版本)。
安装:
pip3 install ipython
运行:
ipython
现在您可以等待异步函数了:
In [1]: await create_order('ID1234ABCD')
关于python - 如何将 Asyncio 与 while 循环一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54686404/