我正在尝试使用trio
和asks
制作一个简单的网络爬虫。我使用 Nursery 一次启动几个爬虫,并使用内存 channel 来维护要访问的 url 列表。
每个爬虫都会接收该 channel 两端的克隆,因此它们可以抓取一个网址(通过 receive_channel),读取它,查找并添加要访问的新网址(通过 send_channel)。
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
async def crawler(send_channel, receive_channel):
async for url in receive_channel: # I'm a consumer!
content = await ...
urls_found = ...
for u in urls_found:
await send_channel.send(u) # I'm a producer too!
在这种情况下,消费者是生产者。如何优雅地停止一切?
关闭一切的条件是:
- channel 为空
- 并且
- 所有抓取工具都停留在第一个 for 循环,等待 url 出现在 receive_channel 中(这……不会再发生了)
我尝试在crawler()
内使用async with send_channel
,但找不到一个好的方法。我还尝试找到一些不同的方法(一些内存 channel 绑定(bind)的工作池等),但这里也没有运气。
最佳答案
这里至少有两个问题。
首先是您关于在 channel 为空时停止的假设。由于您分配的内存 channel 大小为 0,因此它将始终为空。仅当爬网程序已准备好接收 URL 时,您才能传递该 URL。
这产生了第二个问题。如果您发现的网址多于分配给爬虫的网址,您的应用程序将陷入僵局。
原因是,由于您无法将所有找到的网址交给爬虫,爬虫将永远无法准备好接收要爬行的新网址,因为它会卡在等待另一个爬虫获取其中之一它的网址。
情况会变得更糟,因为假设其他爬虫之一发现了新的 url,它们也会被困在已经等待移交其 url 的爬虫后面,并且它们将永远无法获取新的 url 之一。等待处理。
文档的相关部分:
https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels
假设我们解决了这个问题,下一步该去哪里?
您可能需要保留所有访问过的网址的列表(设置?),以确保您不会再次访问它们。
要真正弄清楚何时停止,与其关闭 channel ,不如直接取消托儿所可能要容易得多。
假设我们像这样修改主循环:
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
active_workers = trio.CapacityLimiter(3) # Number of workers
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
while True:
await trio.sleep(1) # Give the workers a chance to start up.
if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
nursery.cancel_scope.cancel() # All done!
现在我们需要稍微修改爬虫,以便在事件时获取 token 。
async def crawler(active_workers, send_channel, receive_channel):
async for url in receive_channel: # I'm a consumer!
with active_workers:
content = await ...
urls_found = ...
for u in urls_found:
await send_channel.send(u) # I'm a producer too!
其他需要考虑的事情 -
您可能想在抓取工具中使用send_channel.send_noblock(u)
。由于您有一个无界的缓冲区,因此不会出现 WouldBlock 异常,并且每次发送时不触发检查点的行为可能是理想的。这样您就可以确定,在其他任务有机会获取新 url 或父任务有机会检查工作是否完成之前,特定 url 已完全处理并且所有新 url 已添加。
关于Python和Trio,生产者是消费者,工作完成后如何优雅退出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65304775/