我正在开发一个抓取工具(python 3.9),我需要启动 make_attempt() ,并根据其执行结果启动应同时工作的其他任务。
首先,我创建初始任务并将其添加到存储所有异步任务的列表中:
self.data["worker"]["tasks"]
然后我启动:
await asyncio.gather(*self.data["worker"]["tasks"])
在 make_attempt() 中,我等待对服务器的 POST 请求结果(我使用 aiohttp 客户端),然后根据结果添加新任务,或者在短暂延迟后重复 make_attempt() 。
我停止当前任务并将其从异步任务列表中删除,然后添加新任务。
async def make_attempt(self):
attempt: int = self.data["res"]["attempt"]
await self.do_something()
await sleep(1)
for task in self.data["worker"]["tasks"]:
print("Task name: %s" % task.get_name())
if task.get_name() == str(attempt):
task.cancel()
self.data["worker"]["tasks"] = [task for task in self.data["worker"]["tasks"] if task.get_name() != str(attempt)]
if 1 > 0: # a condition to start make_attempt() again
self.data["worker"]["tasks"].append(asyncio.create_task(self.make_attempt(), name=attempt))
await asyncio.gather(*self.data["worker"]["tasks"])
async def run(self):
self.data["worker"]["tasks"].append(asyncio.create_task(self.make_attempt(), name=self.data["res"]["attempt"]))
await asyncio.gather(*self.data["worker"]["tasks"])
我是 asyncio 的新手,所以也许你可以指出我的错误或建议更好的实现。
更新。这是我想要实现的目标的架构:
main_task 应该运行,如果结果正常,它应该启动另一个任务的几个实例(参见循环 2)。当获得loop2中任务的结果时,应该运行一个新的子任务。 main_task 应该等待 Loop2 中的所有任务完成或触发超时。 Loop2 中的所有任务应该同时工作。
UPD 2. 在执行 check_base_url() 方法约 1000 次循环后,此代码会产生 RecursionError: Maximum recursion Delection allowed while Calling a Python object
。
class Scraper:
def __init__(self, data):
self.data = data
async def get_response(self, session, url, method="get", *args, **kwargs) -> Union[
ClientResponse, None]:
for _ in range(0, 20):
if method == "get":
try:
response = await session.get(url, headers={}, proxy="_proxy", *args, **kwargs)
if response.status > 399:
raise ScraperError(response.status)
await sleep(0.1)
return response
except (ClientError, ScraperError) as err:
await sleep(0.25)
continue
else:
try:
response = await session.post(url, headers={}, proxy="_proxy", *args, **kwargs)
if response.status > 399:
raise ScraperError(response.status)
await sleep(0.1)
return response
except (ClientError, ScraperError) as err:
await sleep(0.25)
continue
return None
async def get_captcha(self) -> SolvedCaptcha:
for _ in range(0, 20):
captcha = await self.task_1()
if captcha:
continue
async def final_task(self, url) -> bool:
async with ClientSession(cookies={}) as sess:
resp_step1: Union[ClientResponse, None] = await self.get_response(sess, "url", "post",
data={})
if resp_step1:
resp_step2: Union[ClientResponse, None] = await self.get_response(sess, "url", "get")
if resp_step2:
captcha: SolvedCaptcha = await self.get_captcha()
if captcha:
resp_captcha: Union[ClientResponse, None] = await self.get_response(sess, "url",
"post",
data={})
if resp_captcha:
if 2 > 1:
print("FINISHED")
return True
else:
return False
else:
return False
else:
return False
else:
return False
async def add_task_3(self) -> None:
if 2 > 1:
subtasks = [asyncio.create_task(self.final_task(self.data["res"]["slots_urls"][0]))]
await asyncio.gather(*subtasks)
else:
await self.add_task_3()
def parse(self, html: str, url: str) -> None:
soup = BeautifulSoup(html, "lxml")
# do parsing
async def task_2(self, url) -> bool:
async with ClientSession() as sess:
resp: Union[ClientResponse, None] = await self.get_response(sess, url)
if not resp:
return False
html = await resp.text()
self.parse(html, url)
async def add_task_2(self) -> None:
if 2 > 1:
subtasks = [asyncio.create_task(self.task_2(url)) for url in ["url1", "url2"]]
await asyncio.gather(*subtasks)
async def task_1(self) -> bool:
self.data["res"]["captcha_requested"] += 1
res = await self.captcha.task_1()
if not res:
return False
return True
async def add_task_1(self) -> None:
if 2 > 1:
subtasks = [asyncio.create_task(self.task_1()) for _ in range(0, 5)]
await asyncio.gather(*subtasks)
async def get_calendar_url(self, sess) -> bool:
resp: Union[ClientResponse, None] = await self.get_response(sess, "url", method="post",
data={})
if not resp:
return False
else:
return True
async def check_base_url(self) -> bool:
async with ClientSession() as session_0:
return await self.get_calendar_url(session_0)
async def schedule_tasks(self):
def start_again() -> bool:
if 2 > 1:
return True
return False
res_base_url: bool = await self.check_base_url()
if res_base_url:
tasks = [asyncio.create_task(self.add_task_1()),
asyncio.create_task(self.add_task_2()),
asyncio.create_task(self.add_task_3())]
await asyncio.gather(*tasks)
if start_again():
await sleep(0.1)
await self.schedule_tasks()
else:
await self.schedule_tasks()
async def run(self):
await self.schedule_tasks()
最佳答案
一种方法是使用 asyncio.Queue
管理您的应用程序中待处理的作业。然后,您可以创建许多工作任务来从该队列接收作业,也可以添加新作业。
在示例中,使用 asyncio.sleep
模拟实际工作(POST 请求并处理一些数据)。 。有些工作会产生新的工作,而另一些则不会。
worker 将通过这种方式同时接手和工作。
Manager
负责创建作业队列并等待所有项目被处理。之后它将取消所有工作人员并且程序终止。
代码
import asyncio
import random
class Worker:
def __init__(self, num, target_q):
# Worker number
self.num = num
# The job queue
self.target_q = target_q
# Create asyncio task
self.task = asyncio.create_task(self.run())
async def run(self):
# Work on jobs until task is cancelled
while True:
print(f"Worker {self.num}: Waiting for new target")
# Receive a new job from the queue
target = await self.target_q.get()
print(f"Worker {self.num}: Processing target {target}")
try:
# Simulating some work (e.g. a POST request)
await asyncio.sleep(1.0)
# Depending on the outcome, some new work results
# 0-2 new targets are generated
new_target_count = random.randint(0, 2)
if new_target_count > 0:
print(
f"Worker {self.num}: Target {target} generating {new_target_count} more targets"
)
for _ in range(new_target_count):
# Create a new random target
new_target = random.randint(1, 10000)
# Put new targets into queue. This will wait if queue is
# currently full.
await self.target_q.put(new_target)
finally:
print(f"Worker {self.num}: Target {target} done")
# Decrease queue count by one
self.target_q.task_done()
class Manager:
def __init__(self):
# A common queue holding the jobs for the workers. It just stores
# integers here but could hold any data.
self.target_q = asyncio.Queue(10)
# The list of workers
self.workers = None
async def run(self):
# Create some initial work
await self.target_q.put(1)
# Create 3 workers
self.workers = [Worker(num, self.target_q) for num in range(3)]
# Wait until queue of unfinished tasks is empty
await self.target_q.join()
# Cancel other workers
for worker in self.workers:
worker.task.cancel()
def main():
manager = Manager()
loop = asyncio.get_event_loop()
loop.run_until_complete(manager.run())
loop.close()
if __name__ == "__main__":
main()
结果示例
$ python workers_test.py
Worker 0: Waiting for new target
Worker 0: Processing target 1
Worker 1: Waiting for new target
Worker 2: Waiting for new target
Worker 0: Target 1 generating 2 more targets
Worker 0: Target 1 done
Worker 0: Waiting for new target
Worker 0: Processing target 320
Worker 1: Processing target 5807
Worker 0: Target 320 done
Worker 0: Waiting for new target
Worker 1: Target 5807 done
Worker 1: Waiting for new target
关于python - 我是否以正确的方式管理异步任务(python 3.9)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66292395/