被阻止的Python异步函数调用也会阻止另一个异步函数

问题描述

我使用FastAPI来开发访问sql Server的数据层API。 无需使用pytds或pyodbc, 如果存在数据库事务导致任何请求挂起, 所有其他请求将被阻止。 (即使没有数据库操作)

复制:

  1. 故意执行可序列化的sql Server会话,开始事务,并且不回滚或提交
    INSERT INTO [dbo].[KVStore] VALUES ('1','1',0)
    begin tran
    SET TRANSACTION ISOLATION LEVEL Serializable
    SELECT * FROM [dbo].[KVStore]
  1. 使用这样的异步处理程序功能向​​API发送请求:
    def kv_delete_by_key_2_sql():
            conn = pytds.connect(dsn='192.168.0.1',database=cfg.kvStore_db,user=cfg.kvStore_uid,password=cfg.kvStore_upwd,port=1435,autocommit=True)
            engine = conn.cursor()
            try:
                sql = "delete KVStore; commit"

                with concurrent.futures.ThreadPoolExecutor() as executor:
                    future = executor.submit(engine.execute,sql)
                    rs = future.result()
                    j = {
                        'success': True,'rowcount': rs.rowcount
                    }
                    return jsonable_encoder(j)
            except Exception as exn:
                j = {
                    'success': False,'reason': exn_handle(exn)
                }
                return jsonable_encoder(j)

    @app.post("/kvStore/delete")
    async def kv_delete(request: Request,type_: Optional[str] = Query(None,max_length=50)):
        request_data = await request.json()

        return kv_delete_by_key_2_sql()

  1. 并使用如下所示的异步处理程序功能向​​同一应用的API发送请求:

    async def hangit0(request: Request,t: int = Query(0)):
        print(t,datetime.utcNow().strftime('%Y-%m-%d %H:%M:%s.%f')[:-3])
        await asyncio.sleep(t)
        print(t,datetime.utcNow().strftime('%Y-%m-%d %H:%M:%s.%f')[:-3])
        j = {
            'success': True
        }
        return jsonable_encoder(j)
    
    @app.get("/kvStore/hangit/")
    async def hangit(request: Request,t: int = Query(0)):
        return await hangit0(request,t)

我希望step.2挂起,而step.3应该在2秒后直接返回。 但是,如果事务未提交或回滚,则step.3永远不会返回...

如何使这些处理程序功能同时工作?

解决方法

原因是rs = future.result()实际上是一个阻止呼叫-请参见python docs。不幸的是,executor.submit()不会返回等待对象(concurrent.futures.Futureasyncio.Future不同。

您可以使用asyncio.wrap_future来获取concurrent.futures.Future并返回asyncio.Future(请参阅python docs)。可以使用新的Future对象,因此您可以将阻塞函数转换为异步函数。

示例

import asyncio
import concurrent.futures

async def my_async():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future = executor.submit(lambda x: x + 1,1)
        return await asyncio.wrap_future(future)

print(asyncio.run(my_async()))

在您的代码中,只需将rs = future.result()更改为rs = await asyncio.wrap_future(future)并制成整个函数async。那应该做魔术,祝你好运! :)