我正在尝试使用预定义数量的工作人员进行一些网络抓取,作为学习。
我正在使用 None
作为跳出while循环并停止worker的哨兵。
每个 worker 的速度各不相同,在最后一个 worker 之前关闭所有 worker
url 传递给 gather_search_links
获取链接。
我尝试使用 asyncio.Queue
,但我的控制力不如双端队列。
async def gather_search_links(html_sources, detail_urls):
while True:
if not html_sources:
await asyncio.sleep(0)
continue
data = html_sources.pop()
if data is None:
html_sources.appendleft(None)
break
data = BeautifulSoup(data, "html.parser")
result = data.find_all("div", {"data-component": "search-result"})
for record in result:
atag = record.h2.a
url = f'{domain_url}{atag.get("href")}'
detail_urls.appendleft(url)
print("apended data", len(detail_urls))
await asyncio.sleep(0)
async def get_page_source(urls, html_sources):
client = httpx.AsyncClient()
while True:
if not urls:
await asyncio.sleep(0)
continue
url = urls.pop()
print("url", url)
if url is None:
urls.appendleft(None)
break
response = await client.get(url)
html_sources.appendleft(response.text)
await asyncio.sleep(8)
html_sources.appendleft(None)
async def navigate(urls):
for i in range(2, 7):
url = f"https://www.example.com/?page={i}"
urls.appendleft(url)
await asyncio.sleep(0)
nav_urls.appendleft(None)
loop = asyncio.get_event_loop()
nav_html = deque()
nav_urls = deque()
products_url = deque()
navigate_workers = [asyncio.ensure_future(navigate(nav_urls)) for _ in range(1)]
page_source_workers = [asyncio.ensure_future(get_page_source(nav_urls, nav_html)) for _ in range(2)]
product_urls_workers = [asyncio.ensure_future(gather_search_links(nav_html, products_url)) for _ in range(1)]
workers = asyncio.wait([*navigate_workers, *page_source_workers, *product_urls_workers])
loop.run_until_complete(workers)
最佳答案
我是个新手,所以这可能是错误的,但我相信问题在于所有三个函数:navigate()、gather_search_links() 和 get_page_source() 都是异步任务 可以按任何顺序完成 .但是,您检查空双端队列并使用 appendleft 来确保 None 是双端队列中最左边的项目,看起来它们会适本地防止这种情况。出于所有意图和目的,代码看起来应该正确运行。
我认为问题出在这一行:
workers = asyncio.wait([*navigate_workers, *page_source_workers, *product_urls_workers])
根据 this post , asyncio.wait 函数不会根据上面写的顺序对这些任务进行排序,而是根据 IO 作为协程来触发它们。同样,在gather_search_links 和get_page_source 开始时的检查确保一个函数在另一个函数之后运行,因此如果每个函数只有一个工作程序,则此代码应该可以工作。如果每个功能有多个工作人员,我可以看到出现的问题,即 None 最终不会成为双端队列中最左边的项目。也许每个函数末尾的打印语句来显示双端队列的内容对于解决这个问题很有用。我想我的主要问题是,如果您要编写额外的代码,为什么要异步执行这些任务,因为这些步骤必须同步完成?为了获得 HTML,您必须首先拥有 URL。为了抓取 HTML,您必须首先拥有 HTML。 asyncio 在这里提供了什么好处?作为同步任务,所有这三个对我来说都更有意义。获取 URL、获取 HTML、抓取 HTML,然后按此顺序。
编辑:我想到这里异步代码的主要好处是,当您从它们获取 HTML 时,您不想等待每个单独的 URL 同步响应。在这种情况下我会做的是同步收集我的 URL 第一 ,然后将 get 和 scrape 函数组合成一个异步函数,这将是您唯一的异步函数。然后你不需要哨兵或检查“无”值或任何额外的代码,你就可以获得异步获取的完整值。然后,您可以将抓取的数据存储在 future 列表(或双端队列或其他)中。这将简化您的代码并为您提供最快的抓取时间。
最后编辑:
这是我快速而肮脏的重写。我喜欢你的代码,所以我决定自己动手。我不知道它是否有效,我不是 Python 人。
import asyncio
from collections import deque
import httpx as httpx
from bs4 import BeautifulSoup
# Get or build URLs from config
def navigate():
urls = deque()
for i in range(2, 7):
url = f"https://www.example.com/?page={i}"
urls.appendleft(url)
return urls
# Asynchronously fetch and parse data for a single URL
async def fetchHTMLandParse(url):
client = httpx.AsyncClient()
response = await client.get(url)
data = BeautifulSoup(response.text, "html.parser")
result = data.find_all("div", {"data-component": "search-result"})
for record in result:
atag = record.h2.a
#Domain URL was defined elsewhere
url = f'{domain_url}{atag.get("href")}'
products_urls.appendleft(url)
loop = asyncio.get_event_loop()
products_urls = deque()
nav_urls = navigate()
fetch_and_parse_workers = [asyncio.ensure_future(fetchHTMLandParse(url)) for url in nav_urls]
workers = asyncio.wait([*fetch_and_parse_workers])
loop.run_until_complete(workers)
关于python-3.x - Asyncio,由于哨兵问题,任务没有正确完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64426468/