使用asyncio恰好100个请求后,并行请求无限阻塞

问题描述

我尝试同时使用httpx和aiohttp,并且都具有此硬编码限制。

import asyncio

import aiohttp
import httpx


async def main():
    client = aiohttp.ClientSession() 
    # client = httpx.AsyncClient(timeout=None)

    coros = [
        client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",params={"symbol": "ADANIENT.NS","interval": "2m","range": "60d",},)
        for _ in range(500)
    ]

    for i,coro in enumerate(asyncio.as_completed(coros)):
        await coro
        print(i,end=",")


asyncio.run(main())

输出-

0、1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20、21、22、23 ,24、25、26、27、28、29、30、31、32、33、34、35、36、37、38、39、40、41、42、43、44、45、46、47、48 ,49、50、51、52、53、54、55、56、57、58、59、60、61、62、63、64、65、66、67、68、69、70、71、72、73 ,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98 ,99

两个库都只停留在99位

但是,如果每个请求都使用一个新的会话,则不会发生这种情况。

我在做什么错?异步不是使事情变得如此简单的全部目的吗?


我尝试用线程,zmq和请求重写它,并且效果很好-

import zmq

N_WORKERS = 100
N_ITERS = 500

ctx = zmq.Context.instance()


def worker():
    client = requests.Session()

    pull = ctx.socket(zmq.PULL)
    pull.connect("inproc://#1")

    push = ctx.socket(zmq.PUSH)
    push.connect("inproc://#2")

    while True:
        if not pull.recv_pyobj():
            return

        r = client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",)
        push.send_pyobj(r.content)


def ventilator():
    push = ctx.socket(zmq.PUSH)
    push.bind("inproc://#1")

    # distribute tasks to all workers
    for _ in range(N_ITERS):
        push.send_pyobj(True)

    # close down workers
    for _ in range(N_WORKERS):
        push.send_pyobj(False)



# start workers & ventilator
threads = [Thread(target=worker) for _ in range(N_WORKERS)]
threads.append(Thread(target=ventilator))
for t in threads:
    t.start()

# pull results from workers
pull = ctx.socket(zmq.PULL)
pull.bind("inproc://#2")

for i in range(N_ITERS):
    pull.recv_pyobj()
    print(i,")

# wait for workers to exit
for t in threads:
    t.join()

解决方法

问题是您client.get(...)将具有实时句柄的请求对象返回到OS级套接字。无法关闭该对象会导致aiohttp用完套接字,即达到连接器限制100 by default

要解决此问题,您需要关闭client.get()返回的对象,或使用async with,这将确保对象with完成后立即关闭。例如:

async def get(client):
    async with client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",params={"symbol": "ADANIENT.NS","interval": "2m","range": "60d",}) as resp:
        pass

async def main():
    async with aiohttp.ClientSession() as client:
        coros = [get(client) for _ in range(500)]
        for i,coro in enumerate(asyncio.as_completed(coros)):
            await coro
            print(i,end=",",flush=True)

asyncio.run(main())

此外,aiohttp.ClientSession对象也应该关闭,这也可以使用async with完成,如上所示。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...