我想知道如何在 Python 中实现一个任务管理器来重新启动失败的任务。
这是我经过一番思考后得出的结论,但这看起来像是一个黑客。有没有更好的方法来实现这种“ self 修复”的任务组模式?
import asyncio, random
async def noreturn(_arg):
while True:
await asyncio.sleep(1)
if random.randint(0, 10) % 10 == 0:
raise random.choice((RuntimeError, ValueError, TimeoutError))
async def main():
taskmap: dict[int, asyncio.Task] = {}
for i in range(10):
taskmap[i] = asyncio.create_task(noreturn(i))
while True:
for arg, task in taskmap.items():
if task.done():
# Task died
taskmap[arg] = asyncio.create_task(noreturn(arg))
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
预先感谢您的帮助。
最佳答案
这将按原样工作,如果您的核心内容足够简单,可以适应,则按原样进行。
这种方法的主要问题是它是“硬编码的”,即:您的检查循环具有已终止任务所需的所有信息,并且可以通过再次使用相同参数调用相同的协同例程来重新创建它。在较大的系统中,人们会期望您有多个混合任务,并且并不总是手头有它们的初始参数以便在需要时重新创建任务。
因此,一种更适合您的模式是拥有一个中间层,它将保存任务的初始参数 - 或者更好的是,保存到其内部协同例程函数,然后重新创建异步 -每当失败时就根据需要执行任务。
可以根据需要对该层进行可观察性检测(即在失败、重试时生成日志)、重试尝试、重试间隔等。
您可以继承asyncio.task
,并编写包装代码并将其设置为运行循环中的task_factory。不幸的是,您将无法像往常一样简单地实例化您的类(asyncio.create_task
),甚至无法自定义task_factory,因为这需要一个已经创建的协同例程 - 无论您在哪里需要记下您的协同例程参数,以便您可以在失败时重新创建底层协同例程。
代码可以按照下面的示例编写。如果这对生产至关重要,则可能存在未涵盖的边缘情况,我建议您联系专家以获得生产强度代码。尽管如此,这应该可行:
class RetrieableTask: #(asyncio.Task):
retiable = RuntimeError, ValueError, # ...
def __init__(self, coroutine_function, args=(), kwargs=None, name=None, context=None, name=None,... ): # retry extrategies can be parametrized
self.coroutine_function = coroutine_function
self.args = args
self.kwargs = kwargs or {}
self.context = context
self.name = name
self.start_task()
def start_task(self):
self.inner = asyncio.create_task(self.coroutine_function(*self.args, self.**kwargs)
context=self.context)
self.inner.set_name(self.name)
def done(self):
result = self.inner..done()
if result:
exception = self.inner.exception() # may raise CancelledError: just let it bubble through
if exception and isinstance(exception, self.retriable):
# if needed log, and check retry policies
self.start_task()
return False
# bridge other task methods to the inner task as needed:
def result(self):
return self.inner.result()
def exception(self):
return self.inner.exception()
def cancel(self, msg=None):
return self.inner.cancel(msg)
def set_name(self, name):
self.name = name
self.inner.set_name(name)
def get_name(self):
return self.name
# repeat proxying of methods as needed for
# the methods documented in https://docs.python.org/3/library/asyncio-task.html#task-object
这就是它的使用方式:
async def noreturn(_arg):
while True:
await asyncio.sleep(1)
if random.randint(0, 10) % 10 == 0:
raise random.choice((RuntimeError, ValueError, TimeoutError))
async def main():
tasks = []
for i in range(10):
tasks.append(RetriableTask(noreturn, args=(i,))
while any(not task.done() for task in tasks):
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
关于python - Python 中的 self 修复任务组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75747229/