python - 我是否以正确的方式管理异步任务(python 3.9)?

标签 python python-asyncio python-3.8 python-3.9

我正在开发一个抓取工具(python 3.9),我需要启动 make_attempt() ,并根据其执行结果启动应同时工作的其他任务。

首先,我创建初始任务并将其添加到存储所有异步任务的列表中: self.data["worker"]["tasks"] 然后我启动: await asyncio.gather(*self.data["worker"]["tasks"])

在 make_attempt() 中,我等待对服务器的 POST 请求结果(我使用 aiohttp 客户端),然后根据结果添加新任务,或者在短暂延迟后重复 make_attempt() 。

我停止当前任务并将其从异步任务列表中删除,然后添加新任务。

async def make_attempt(self):
    attempt: int = self.data["res"]["attempt"]

    await self.do_something()
    await sleep(1)

    for task in self.data["worker"]["tasks"]:
        print("Task name: %s" % task.get_name())
        if task.get_name() == str(attempt):
            task.cancel()
    self.data["worker"]["tasks"] = [task for task in self.data["worker"]["tasks"] if task.get_name() != str(attempt)]

    if 1 > 0:  # a condition to start make_attempt() again
        self.data["worker"]["tasks"].append(asyncio.create_task(self.make_attempt(), name=attempt))
        await asyncio.gather(*self.data["worker"]["tasks"])

async def run(self):
    self.data["worker"]["tasks"].append(asyncio.create_task(self.make_attempt(), name=self.data["res"]["attempt"]))
    await asyncio.gather(*self.data["worker"]["tasks"])

我是 asyncio 的新手,所以也许你可以指出我的错误或建议更好的实现。

更新。这是我想要实现的目标的架构:

tasks_schema

main_task 应该运行,如果结果正常,它应该启动另一个任务的几个实例(参见循环 2)。当获得loop2中任务的结果时,应该运行一个新的子任务。 main_task 应该等待 Loop2 中的所有任务完成或触发超时。 Loop2 中的所有任务应该同时工作。

UPD 2. 在执行 check_base_url() 方法约 1000 次循环后,此代码会产生 RecursionError: Maximum recursion Delection allowed while Calling a Python object

class Scraper:
    def __init__(self, data):
        self.data = data

    async def get_response(self, session, url, method="get", *args, **kwargs) -> Union[
        ClientResponse, None]:
        for _ in range(0, 20):
            if method == "get":
                try:
                    response = await session.get(url, headers={}, proxy="_proxy", *args, **kwargs)
                    if response.status > 399:
                        raise ScraperError(response.status)
                    await sleep(0.1)
                    return response
                except (ClientError, ScraperError) as err:
                    await sleep(0.25)
                    continue
            else:
                try:
                    response = await session.post(url, headers={}, proxy="_proxy", *args, **kwargs)
                    if response.status > 399:
                        raise ScraperError(response.status)
                    await sleep(0.1)
                    return response
                except (ClientError, ScraperError) as err:
                    await sleep(0.25)
                    continue
        return None

    async def get_captcha(self) -> SolvedCaptcha:
        for _ in range(0, 20):
            captcha = await self.task_1()
            if captcha:
                continue

    async def final_task(self, url) -> bool:
        async with ClientSession(cookies={}) as sess:
            resp_step1: Union[ClientResponse, None] = await self.get_response(sess, "url", "post",
                                                                              data={})
            if resp_step1:
                resp_step2: Union[ClientResponse, None] = await self.get_response(sess, "url", "get")
                if resp_step2:
                    captcha: SolvedCaptcha = await self.get_captcha()
                    if captcha:
                        resp_captcha: Union[ClientResponse, None] = await self.get_response(sess, "url",
                                                                                            "post",
                                                                                            data={})
                        if resp_captcha:
                            if 2 > 1:
                                print("FINISHED")
                                return True
                        else:
                            return False
                    else:
                        return False
                else:
                    return False
            else:
                return False

    async def add_task_3(self) -> None:
        if 2 > 1:
            subtasks = [asyncio.create_task(self.final_task(self.data["res"]["slots_urls"][0]))]
            await asyncio.gather(*subtasks)
        else:
            await self.add_task_3()

    def parse(self, html: str, url: str) -> None:
        soup = BeautifulSoup(html, "lxml")
        # do parsing

    async def task_2(self, url) -> bool:
        async with ClientSession() as sess:
            resp: Union[ClientResponse, None] = await self.get_response(sess, url)
            if not resp:
                return False
            html = await resp.text()
            self.parse(html, url)

    async def add_task_2(self) -> None:
        if 2 > 1:
            subtasks = [asyncio.create_task(self.task_2(url)) for url in ["url1", "url2"]]
            await asyncio.gather(*subtasks)

    async def task_1(self) -> bool:
        self.data["res"]["captcha_requested"] += 1
        res = await self.captcha.task_1()
        if not res:
            return False
        return True

    async def add_task_1(self) -> None:
        if 2 > 1:
            subtasks = [asyncio.create_task(self.task_1()) for _ in range(0, 5)]
            await asyncio.gather(*subtasks)

    async def get_calendar_url(self, sess) -> bool:
        resp: Union[ClientResponse, None] = await self.get_response(sess, "url", method="post",
                                                                    data={})
        if not resp:
            return False
        else:
            return True

    async def check_base_url(self) -> bool:
        async with ClientSession() as session_0:
            return await self.get_calendar_url(session_0)

    async def schedule_tasks(self):
        def start_again() -> bool:
            if 2 > 1:
                return True
            return False

        res_base_url: bool = await self.check_base_url()
        if res_base_url:
            tasks = [asyncio.create_task(self.add_task_1()),
                     asyncio.create_task(self.add_task_2()),
                     asyncio.create_task(self.add_task_3())]
            await asyncio.gather(*tasks)
            if start_again():
                await sleep(0.1)
                await self.schedule_tasks()
        else:
            await self.schedule_tasks()

    async def run(self):
        await self.schedule_tasks()

最佳答案

一种方法是使用 asyncio.Queue管理您的应用程序中待处理的作业。然后,您可以创建许多工作任务来从该队列接收作业,也可以添加新作业。

在示例中,使用 asyncio.sleep 模拟实际工作(POST 请求并处理一些数据)。 。有些工作会产生新的工作,而另一些则不会。

worker 将通过这种方式同时接手和工作。

Manager 负责创建作业队列并等待所有项目被处理。之后它将取消所有工作人员并且程序终止。

代码

import asyncio
import random


class Worker:
    def __init__(self, num, target_q):
        # Worker number
        self.num = num
        # The job queue
        self.target_q = target_q
        # Create asyncio task
        self.task = asyncio.create_task(self.run())

    async def run(self):
        # Work on jobs until task is cancelled
        while True:
            print(f"Worker {self.num}: Waiting for new target")

            # Receive a new job from the queue
            target = await self.target_q.get()
            print(f"Worker {self.num}: Processing target {target}")

            try:
                # Simulating some work (e.g. a POST request)
                await asyncio.sleep(1.0)

                # Depending on the outcome, some new work results
                # 0-2 new targets are generated
                new_target_count = random.randint(0, 2)

                if new_target_count > 0:
                    print(
                        f"Worker {self.num}: Target {target} generating {new_target_count} more targets"
                    )
                    for _ in range(new_target_count):
                        # Create a new random target
                        new_target = random.randint(1, 10000)

                        # Put new targets into queue. This will wait if queue is
                        # currently full.
                        await self.target_q.put(new_target)

            finally:
                print(f"Worker {self.num}: Target {target} done")

                # Decrease queue count by one
                self.target_q.task_done()


class Manager:
    def __init__(self):
        # A common queue holding the jobs for the workers. It just stores
        # integers here but could hold any data.
        self.target_q = asyncio.Queue(10)
        # The list of workers
        self.workers = None

    async def run(self):
        # Create some initial work
        await self.target_q.put(1)

        # Create 3 workers
        self.workers = [Worker(num, self.target_q) for num in range(3)]

        # Wait until queue of unfinished tasks is empty
        await self.target_q.join()

        # Cancel other workers
        for worker in self.workers:
            worker.task.cancel()


def main():
    manager = Manager()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(manager.run())
    loop.close()


if __name__ == "__main__":
    main()

结果示例

$ python workers_test.py
Worker 0: Waiting for new target
Worker 0: Processing target 1
Worker 1: Waiting for new target
Worker 2: Waiting for new target
Worker 0: Target 1 generating 2 more targets
Worker 0: Target 1 done
Worker 0: Waiting for new target
Worker 0: Processing target 320
Worker 1: Processing target 5807
Worker 0: Target 320 done
Worker 0: Waiting for new target
Worker 1: Target 5807 done
Worker 1: Waiting for new target

关于python - 我是否以正确的方式管理异步任务(python 3.9)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66292395/

相关文章:

python - 数字到 ASCII 字符串转换器

python - 怎样才能把1000本小说全部搜到呢? ,我的代码只抓取了 691

python - iPython ImportError:没有名为display的模块

python - 如何模拟异步协程?

python-3.x - 在 Python 中,如何测试调用 asyncio.ensure_future(...) 的函数?

python - 更改 mlab quiver3d 和 surf 数据源而不清除特征脚本中的数字

python - 如何等待聚集的任务组?

python - 如何使用 Python @singledispatch 注册 Typing.Callable?

azure - Azure Functions 应用程序上的增量内存消耗

python - 是否可以在多处理中运行 `mock.patch` 函数(使用 `spawn` )?