问题描述
我使用FastAPI来开发访问sql Server的数据层API。 无需使用pytds或pyodbc, 如果存在数据库事务导致任何请求挂起, 所有其他请求将被阻止。 (即使没有数据库操作)
复制:
- 故意执行可序列化的sql Server会话,开始事务,并且不回滚或提交
INSERT INTO [dbo].[KVStore] VALUES ('1','1',0)
begin tran
SET TRANSACTION ISOLATION LEVEL Serializable
SELECT * FROM [dbo].[KVStore]
- 使用这样的异步处理程序功能向API发送请求:
def kv_delete_by_key_2_sql():
conn = pytds.connect(dsn='192.168.0.1',database=cfg.kvStore_db,user=cfg.kvStore_uid,password=cfg.kvStore_upwd,port=1435,autocommit=True)
engine = conn.cursor()
try:
sql = "delete KVStore; commit"
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(engine.execute,sql)
rs = future.result()
j = {
'success': True,'rowcount': rs.rowcount
}
return jsonable_encoder(j)
except Exception as exn:
j = {
'success': False,'reason': exn_handle(exn)
}
return jsonable_encoder(j)
@app.post("/kvStore/delete")
async def kv_delete(request: Request,type_: Optional[str] = Query(None,max_length=50)):
request_data = await request.json()
return kv_delete_by_key_2_sql()
- 并使用如下所示的异步处理程序功能向同一应用的API发送请求:
async def hangit0(request: Request,t: int = Query(0)):
print(t,datetime.utcNow().strftime('%Y-%m-%d %H:%M:%s.%f')[:-3])
await asyncio.sleep(t)
print(t,datetime.utcNow().strftime('%Y-%m-%d %H:%M:%s.%f')[:-3])
j = {
'success': True
}
return jsonable_encoder(j)
@app.get("/kvStore/hangit/")
async def hangit(request: Request,t: int = Query(0)):
return await hangit0(request,t)
我希望step.2挂起,而step.3应该在2秒后直接返回。 但是,如果事务未提交或回滚,则step.3永远不会返回...
如何使这些处理程序功能同时工作?
解决方法
原因是rs = future.result()
实际上是一个阻止呼叫-请参见python docs。不幸的是,executor.submit()
不会返回等待对象(concurrent.futures.Future
与asyncio.Future
不同。
您可以使用asyncio.wrap_future
来获取concurrent.futures.Future
并返回asyncio.Future
(请参阅python docs)。可以使用新的Future
对象,因此您可以将阻塞函数转换为异步函数。
示例:
import asyncio
import concurrent.futures
async def my_async():
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(lambda x: x + 1,1)
return await asyncio.wrap_future(future)
print(asyncio.run(my_async()))
在您的代码中,只需将rs = future.result()
更改为rs = await asyncio.wrap_future(future)
并制成整个函数async
。那应该做魔术,祝你好运! :)