python - Python 中的 self 修复任务组

标签 python python-asyncio self-healing

我想知道如何在 Python 中实现一个任务管理器来重新启动失败的任务。

这是我经过一番思考后得出的结论,但这看起来像是一个黑客。有没有更好的方法来实现这种“ self 修复”的任务组模式?

import asyncio, random

async def noreturn(_arg):
    while True:
        await asyncio.sleep(1)
        
        if random.randint(0, 10) % 10 == 0:
            raise random.choice((RuntimeError, ValueError, TimeoutError))


async def main():
    taskmap: dict[int, asyncio.Task] = {}

    for i in range(10):
        taskmap[i] = asyncio.create_task(noreturn(i))
    
    while True:
        for arg, task in taskmap.items():
            if task.done():
                # Task died
                taskmap[arg] = asyncio.create_task(noreturn(arg))

        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

预先感谢您的帮助。

最佳答案

这将按原样工作,如果您的核心内容足够简单,可以适应,则按原样进行。

这种方法的主要问题是它是“硬编码的”,即:您的检查循环具有已终止任务所需的所有信息,并且可以通过再次使用相同参数调用相同的协同例程来重新创建它。

在较大的系统中,人们会期望您有多个混合任务,并且并不总是手头有它们的初始参数以便在需要时重新创建任务。

因此,一种更适合您的模式是拥有一个中间层,它将保存任务的初始参数 - 或者更好的是,保存到其内部协同例程函数,然后重新创建异步 -每当失败时就根据需要执行任务。

可以根据需要对该层进行可观察性检测(即在失败、重试时生成日志)、重试尝试、重试间隔等。

您可以继承asyncio.task,并编写包装代码并将其设置为运行循环中的task_factory。不幸的是,您将无法像往常一样简单地实例化您的类(asyncio.create_task),甚至无法自定义task_factory,因为这需要一个已经创建的协同例程 - 无论您在哪里需要记下您的协同例程参数,以便您可以在失败时重新创建底层协同例程。

代码可以按照下面的示例编写。如果这对生产至关重要,则可能存在未涵盖的边缘情况,我建议您联系专家以获得生产强度代码。尽管如此,这应该可行:

    
class RetrieableTask:   #(asyncio.Task):
    retiable = RuntimeError, ValueError, # ...
    
    def __init__(self, coroutine_function, args=(), kwargs=None, name=None, context=None, name=None,... ): # retry extrategies can be parametrized
        self.coroutine_function = coroutine_function
        self.args = args 
        self.kwargs = kwargs or {}
        self.context = context
        self.name = name
        self.start_task()
        
    def start_task(self):
        self.inner = asyncio.create_task(self.coroutine_function(*self.args, self.**kwargs)
                                         context=self.context)
        self.inner.set_name(self.name)
        
    
    def done(self):
        result = self.inner..done()
        if result:
            exception = self.inner.exception() # may raise CancelledError: just let it bubble through
            if exception and isinstance(exception, self.retriable):
                # if needed log, and check retry policies
                self.start_task()
                return False
    # bridge other task methods to the inner task as needed:
    def result(self):
        return self.inner.result()
    def exception(self):
        return self.inner.exception()
    def cancel(self, msg=None):
        return self.inner.cancel(msg)
    
    def set_name(self, name):
        self.name = name
        self.inner.set_name(name)
    def get_name(self):
        return self.name
    # repeat proxying of methods as needed for
    # the methods documented in https://docs.python.org/3/library/asyncio-task.html#task-object
    

这就是它的使用方式:

async def noreturn(_arg):
    while True:
        await asyncio.sleep(1)

        if random.randint(0, 10) % 10 == 0:
            raise random.choice((RuntimeError, ValueError, TimeoutError))


async def main():
    tasks = []

    for i in range(10):
        tasks.append(RetriableTask(noreturn, args=(i,))

    while any(not task.done() for task in tasks):
        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

关于python - Python 中的 self 修复任务组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75747229/

相关文章:

Linux self 修复脚本来检查某些进程

javascript - Scrapy splash 响应不会返回完整的 html

python - 将每个数字数组元素绘制为一条水平线

python - @asyncio.coroutine 与 async def

python-3.x - 将tornado与aiohttp(或其他基于asyncio的库)一起使用

python - 为由 urlopen() 或 requests.get() 创建的类文件对象提供文件名

Python matplotlib 线框失真

python - 如何将 asyncio 任务添加到 pyqt5 事件循环,以便它运行并避免从未等待的错误?

web-config - web.config <system.webServer> 错误