如何在获取嵌套网址时在asyncio中链接协程

问题描述

我目前正在设计一个蜘蛛来抓取特定的网站。我可以做到同步,但是我想尽我所能使asyncio尽可能高效。我尝试了很多不同的方法,分别使用yieldchained functionsqueues,但是我无法使其正常工作。

我对设计部分和解决问题最感兴趣。并不是必需的可运行代码,而是突出显示了异步最重要的方面。我无法发布任何代码,因为我的尝试不值得分享

任务:

exemple.com(我知道应该是example.com)具有以下设计:

enter image description here

以同步方式,逻辑将是这样的:

for table in my_url_list:
    # Get HTML
    # Extract urls from HTML to user_list
    for user in user_list:
        # Get HTML
        # Extract urls from HTML to user_subcat_list
        for subcat in user_subcat_list:
            # extract content

但是现在我想以异步方式抓取该网站。可以说我们使用5个实例(pyppeteer中的选项卡或aiohttp中的请求)来解析内容。我们应该如何设计它以使其最有效?我们应该使用哪种异步语法?

更新

感谢@ user4815162342解决了我的问题。我一直在研究他的解决方案,如果其他人想使用asyncio,我会在下面发布可运行的代码

import asyncio
import random
 
my_url_list = ['exemple.com/table1','exemple.com/table2','exemple.com/table3']


# Random sleeps to simulate requests to the server
async def randsleep(caller=None):
    i = random.randint(1,6)
    if caller:
        print(f"Request HTML for {caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)


async def process_urls(url_list):
    print(f'async def process_urls: added {url_list}')
    limit = asyncio.Semaphore(5)
    coros = [process_user_list(table,limit) for table in url_list]
    await asyncio.gather(*coros)


async def process_user_list(table,limit):
    async with limit:
        # Simulate HTML request and extracting urls to populate user_list
        await randsleep(table)
        if table[-1] == '1':
            user_list = ['exemple.com/user1','exemple.com/user2','exemple.com/user3']
        elif table[-1] == '2':
            user_list = ['exemple.com/user4','exemple.com/user5','exemple.com/user6']
        else:
            user_list = ['exemple.com/user7','exemple.com/user8','exemple.com/user9']

        print(f'async def process_user_list: Extracted {user_list} from {table}')

    # Execute process_user in parallel,but do so outside the `async with`
    # because process_user will also need the semaphore,and we don't need
    # it any more since we're done with fetching HTML.
    coros = [process_user(user,limit) for user in user_list]
    await asyncio.gather(*coros)


async def process_user(user,limit):
    async with limit:
        # Simulate HTML request and extracting urls to populate user_subcat_list
        await randsleep(user)
        user_subcat_list = [user + '/profile',user + '/info',user + '/followers']
        print(f'async def process_user: Extracted {user_subcat_list} from {user}')

    coros = [process_subcat(subcat,limit) for subcat in user_subcat_list]
    await asyncio.gather(*coros)


async def process_subcat(subcat,limit):
    async with limit:
        # Simulate HTML request and extracting content
        await randsleep(subcat)
        print(f'async def process_subcat: Extracted content from {subcat}')

if __name__ == '__main__':
    asyncio.run(process_urls(my_url_list))

解决方法

让我们重新构造同步代码,以便可以访问网络的每个部分都具有单独的功能。该功能未更改,但以后将使事情变得更容易:

def process_urls(url_list):
    for table in url_list:
        process_user_list(table)

def process_user_list(table):
    # Get HTML,extract user_list
    for user in user_list:
        process_user(user)

def process_user(user):
    # Get HTML,extract user_subcat_list
    for subcat in user_subcat_list:
        process_subcat(subcat)

def process_subcat(subcat):
    # get HTML,extract content

if __name__ == '__main__':
    process_urls(my_url_list)

假设处理顺序无关紧要,我们希望异步版本并行运行现在在for循环中调用的所有函数。它们仍将在单个线程上运行,但是它们将await可能阻塞的所有内容,允许事件循环并行化等待,并通过在准备好进行操作时恢复每个协程来驱动它们完成。这是通过将每个协程作为一个独立的任务生成的,该任务独立于其他任务运行并因此并行运行。例如,process_urls的顺序(但仍是异步)版本将如下所示:

async def process_urls(url_list):
    for table in url_list:
        await process_user_list(table)

这是异步的,因为它在事件循环内运行,并且您可以并行运行几个这样的函数(我们将在稍后展示如何执行),但是它也是顺序的,因为它选择每个await调用process_user_listawait明确指示asyncio等待恢复process_urls的执行,直到process_user_list的结果可用为止。

我们想要的是告诉asyncio并行运行process_user_list的所有调用,并让我们知道它们何时全部完成。在“背景”中生成协程的基本原语是使用asyncio.create_task将它作为 task 调度,这是与轻量级线程最接近的异步等效项。使用create_task的并行版本process_urls看起来像这样:

async def process_urls(url_list):
    # spawn a task for each table
    tasks = []
    for table in url_list:
        asyncio.create_task(process_user_list(table))
        tasks.append(task)
    # The tasks are now all spawned,so awaiting one task lets
    # them all run.
    for task in tasks:
        await task

第二个循环看起来像是依次等待任务,但是由于每个await运行 all 个任务,因此总等待时间将不超过最长任务的时间{{ 3}}。

此模式使用得非常频繁,因此asyncio具有专用的实用程序功能regardless of the order in which they finish。使用此功能,可以用更短的版本表示相同的代码:

async def process_urls(url_list):
    coros = [process_user_list(table) for table in url_list]
    await asyncio.gather(*coros)

但是还有另一件事要注意:由于process_user_list将从服务器获取HTML,并且将有许多并行运行的实例,因此我们不能允许它通过数百个同时连接来锤击服务器。 。我们可以创建一个工作任务池和某种队列,但是asyncio提供了一个更优雅的解决方案:asyncio.gather。信号量是一种同步设备,不允许并行进行超过预定数量的激活,因此其余的等待排队。

process_urls的最终版本会创建一个信号量,并将其向下传递。它不会激活信号灯,因为process_urls实际上并未获取任何HTML本身,因此没有理由让它在process_user_list运行时保留信号灯插槽。

async def process_urls(url_list):
    limit = asyncio.Semaphore(5)
    coros = [process_user_list(table,limit) for table in url_list]
    await asyncio.gather(*coros)

process_user_list看起来很相似,但是它确实需要使用async with来激活信号量:

async def process_user_list(table,limit):
    async with limit:
        # Get HTML using aiohttp,extract user_list

    # Execute process_user in parallel,but do so outside the `async with`
    # because process_user will also need the semaphore,and we don't need
    # it any more since we're done with fetching HTML.
    coros = [process_user(user,limit) for user in user_list]
    await asyncio.gather(*coros)

process_userprocess_subcat大致相同:

async def process_user(user,limit):
    async with limit:
        # Get HTML,extract user_subcat_list
    coros = [process_subcat(subcat,limit) for subcat in user_subcat_list]
    await asyncio.gather(*coros)

def process_subcat(subcat,limit):
    async with limit:
        # get HTML,extract content
    # do something with content

if __name__ == '__main__':
    asyncio.run(process_urls(my_url_list))

在实践中,您可能希望异步函数共享相同的aiohttp会话,因此您可能会在顶级函数(在您的情况下为process_urls)中创建它,并将其与信号量一起传递给。提取HTML的每个函数对于aiohttp请求/响应都会有另一个async with,例如:

async with limit:
    async with session.get(url,params...) as resp:
        # get HTML data here
        resp.raise_for_status()
        resp = await resp.read()
# extract content from HTML data here

可以将两个async with折叠为一个,以减少缩进但保持相同的含义:

async with limit,session.get(url,params...) as resp:
    # get HTML data here
    resp.raise_for_status()
    resp = await resp.read()
# extract content from HTML data here