Python和Trio,生产者是消费者,工作完成后如何优雅退出?

标签 python channel consumer producer python-trio

我正在尝试使用trioasks制作一个简单的网络爬虫。我使用 Nursery 一次启动几个爬虫,并使用内存 channel 来维护要访问的 url 列表。

每个爬虫都会接收该 channel 两端的克隆,因此它们可以抓取一个网址(通过 receive_channel),读取它,查找并添加要访问的新网址(通过 send_channel)。

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())


async def crawler(send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        content = await ...
        urls_found = ...
        for u in urls_found:
            await send_channel.send(u)  # I'm a producer too!

在这种情况下,消费者生产者。如何优雅地停止一切?

关闭一切的条件是:

  • channel 为空
  • 并且
  • 所有抓取工具都停留在第一个 for 循环,等待 url 出现在 receive_channel 中(这……不会再发生了)

我尝试在crawler()内使用async with send_channel,但找不到一个好的方法。我还尝试找到一些不同的方法(一些内存 channel 绑定(bind)的工作池等),但这里也没有运气。

最佳答案

这里至少有两个问题。

首先是您关于在 channel 为空时停止的假设。由于您分配的内存 channel 大小为 0,因此它将始终为空。仅当爬网程序已准备好接收 URL 时,您才能传递该 URL。

这产生了第二个问题。如果您发现的网址多于分配给爬虫的网址,您的应用程序将陷入僵局。

原因是,由于您无法将所有找到的网址交给爬虫,爬虫将永远无法准备好接收要爬行的新网址,因为它会卡在等待另一个爬虫获取其中之一它的网址。

情况会变得更糟,因为假设其他爬虫之一发现了新的 url,它们也会被困在已经等待移交其 url 的爬虫后面,并且它们将永远无法获取新的 url 之一。等待处理。

文档的相关部分:

https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels

假设我们解决了这个问题,下一步该去哪里?

您可能需要保留所有访问过的网址的列表(设置?),以确保您不会再次访问它们。

要真正弄清楚何时停止,与其关闭 channel ,不如直接取消托儿所可能要容易得多。

假设我们像这样修改主循环:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    active_workers = trio.CapacityLimiter(3) # Number of workers
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            while True:
                await trio.sleep(1) # Give the workers a chance to start up.
                if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                    nursery.cancel_scope.cancel() # All done!

现在我们需要稍微修改爬虫,以便在事件时获取 token 。

async def crawler(active_workers, send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        with active_workers:
            content = await ...
            urls_found = ...
            for u in urls_found:
                await send_channel.send(u)  # I'm a producer too!

其他需要考虑的事情 -

您可能想在抓取工具中使用send_channel.send_noblock(u)。由于您有一个无界的缓冲区,因此不会出现 WouldBlock 异常,并且每次发送时不触发检查点的行为可能是理想的。这样您就可以确定,在其他任务有机会获取新 url 或父任务有机会检查工作是否完成之前,特定 url 已完全处理并且所有新 url 已添加。

关于Python和Trio,生产者是消费者,工作完成后如何优雅退出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65304775/

相关文章:

Python:根据其他两列的值有条件地创建新列

去 channel 死锁问题

Golang 缓冲 channel 在发送之前接收数据

java - 在 Java 8 中模仿 C# 操作

python - 如何在 Django 2.1 中实现新帖子的审核

python - 使用 pandas 将两个不同的数据帧转换为一个 json 文件

python - 解析时获取空列表

Django channel WebSocket 握手期间出错 : Unexpected response code: 500

c# - 如何在 EasyNetQ 中为每个消费者声明自定义错误交换?

c - PThreads.消费者-生产者!打印在不同的文件中