问题描述
代码中的逻辑是通过(异步)HTTP请求提取数据,然后构建大量词典,其中随机生成一个值:
import asyncio
import random
import string
import time
from concurrent.futures import ProcesspoolExecutor
from itertools import cycle
from httpx import Asyncclient
URL = 'http://localhost:8080'
COUNT = 1_000_000
def rand_str(length=10):
return ''.join(random.choice(string.ascii_uppercase) for i in range(length))
def parser(data,count):
items = []
for _,item in zip(range(count),cycle(data)):
item['instance'] = rand_str()
items.append(item)
return items
async def parser_coro(data,cycle(data)):
item['instance'] = rand_str()
items.append(item)
return items
async def run_in_executor(func,pool,*args,**kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(pool,func,**kwargs)
async def main():
async with Asyncclient(base_url=URL) as client:
r = await client.get('/api/alerts/')
data = r.json()
# Case 1
t1 = time.perf_counter()
parser(data,COUNT)
t2 = time.perf_counter()
print(f'Case 1 - sync: {t2 - t1:.3f}s')
# Case 2
t1 = time.perf_counter()
await parser_coro(data,COUNT)
t2 = time.perf_counter()
print(f'Case 2 - coro (no await): {t2 - t1:.3f}s')
# Case 3
t1 = time.perf_counter()
await run_in_executor(parser,None,data,COUNT)
t2 = time.perf_counter()
print(f'Case 3 - thread executor: {t2 - t1:.3f}s')
# Case 4
t1 = time.perf_counter()
with ProcesspoolExecutor() as executor:
await run_in_executor(parser,executor,COUNT)
t2 = time.perf_counter()
print(f'Case 4 - process executor: {t2 - t1:.3f}s')
if __name__ == '__main__':
asyncio.run(main(),debug=True)
测试:
$ python test.py
Case 1 - sync: 6.593s
Case 2 - coro (no await): 6.565s
Executing <Task pending name='Task-1' coro=<main() running at test.py:63> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/futures.py:360,<TaskWakeupMethWrapper object at 0x7efff962a1f0>()] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:184] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:591> took 13.176 seconds
Case 3 - thread executor: 6.675s
Case 4 - process executor: 6.726s
问题:
我应该在执行程序中运行parser
函数,以便在生成列表时不阻塞主线程,否则在这种情况下将不起作用吗?在这种情况下,这实际上是受cpu或I / O约束的工作量吗?我想没有任何IO,但是正在建立列表是一项cpu密集型任务,因此工作负载受cpu限制了吗?
解决方法
我应该在执行程序中运行
parser
函数,以便在生成列表时不阻塞主线程,或者在这种情况下不起作用吗?
是的,你应该。尽管全局解释器处于锁定状态,但使用单独的线程还是有帮助的,因为Python将允许执行从解析切换到异步线程,而parser
不会意识到这一点。因此,使用线程可以防止事件循环被阻塞6秒钟,或运行该函数所需的时间。
请注意,parser_coro
变体与没有执行程序的parser
变体没有什么不同,因为它不会等待任何东西。 await parser_coro(...)
将停止发泄循环,就像对parser(...)
的无执行者调用一样。
在这种情况下,这实际上是CPU或I / O约束的工作量吗?
我无法评论其余的工作量,但是编写的功能肯定是受CPU限制的。
我可以在
ThreadPoolExecutor
中运行它,以便它不会阻塞,或者它必须是ProcessPoolExecutor
,因为它是受CPU限制的功能?
您可以在ThreadPoolExecutor
中运行它。只是如果您有一堆并行运行,它们将共享相同的CPU内核。 (但是它们不会阻塞其他协程,因为它们将在事件循环线程中运行。)