如何让某些异步脚本继续抓取,目前该脚本在执行过程中卡在某个地方

问题描述

我已经使用asyncio库创建了一个脚本来解析网页中其他帖子所有者的名字。想法是在脚本中提供此link,该脚本解析每个页面上不同帖子的所有链接,并遍历下一页以执行相同操作。但是,脚本然后使用此功能fetch_again()中的所有链接来访问内部页面,以获取所有帖子的所有者。

尽管我可以从登录页面删除所有者的名称,但是我使用以下方法只是为了知道如何使用我尝试的设计实现相同的目的。我在脚本中使用了semaphore来限制请求数。

当我使用以下脚本时,发现它在100篇或更多篇文章中起作用,然后卡住了。它不会引发任何错误

我尝试过:

import aiohttp
import asyncio
from lxml.html import fromstring
from urllib.parse import urljoin

link = "https://stackoverflow.com/questions/tagged/web-scraping"

semaphore = asyncio.Semaphore(10)

async def fetch(url):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                text = await response.text()
                result = await processing_docs(session,text)
            return result

async def processing_docs(session,html):
    coros = []
    tree = fromstring(html)
    titles = [urljoin(link,title.attrib['href']) for title in tree.cssselect(".summary .question-hyperlink")]
    for title in titles:
        coros.append(fetch_again(session,title))

    next_page = tree.cssselect("div.pager a[rel='next']")
    if next_page:
        page_link = urljoin(link,next_page[0].attrib['href'])
        coros.append(fetch(page_link))
    await asyncio.gather(*coros)

async def fetch_again(session,url):
    async with semaphore:
        async with session.get(url) as response:
            text = await response.text()
            tree = fromstring(text)
            title = tree.cssselect("h1[itemprop='name'] a")[0].text
            print(title)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(asyncio.gather(*(fetch(url) for url in [link])))
    loop.run_until_complete(future)
    loop.close()

如何让脚本继续分析当前卡在执行过程中的某个地方?

解决方法

脚本可能由于死锁而阻塞:fetch获取信号量并调用processing_docs,从而递归调用fetch和{{1 }} 仍保持信号量。如果fetch_again的递归深度达到10,则最里面的fetch将永远不会获得该信号量,因为它已被其调用者所获取。我建议您用fetch替换递归,并用固定数量的工作程序任务耗尽(并填充)队列。这样一来,您甚至都不需要信号灯,并且可以保证不会死锁。

一个不需要重构的更简单的修复方法是,只需将递归调用移至asyncio.Queue块之外的processing_docs(),即在释放信号量的情况下调用async with semaphore。毕竟,信号量的目的是限制对远程服务器的并发访问,而不是限制本地处理,因为异步处理是单线程的,本地处理首先不是并发的。那应该消除死锁,同时仍然限制并发连接的数量:

processing_docs()

还请注意,您可能应该在顶级协程中创建一个会话,并将其传播到整个代码中。您已经在async def fetch(url): async with aiohttp.ClientSession() as session: async with semaphore: async with session.get(url) as response: text = await response.text() result = await processing_docs(session,text) return result fetchprocessing_docs之间进行了此操作,但是对于对fetch_again的顶级调用也可以进行此操作。