问题描述
我正在尝试使用aioodbc同时发送多个查询到配置单元,并将这些查询的结果作为列表返回。我想要一个函数,将查询列表作为输入,异步将其发送到hive,并返回python数据帧的列表或字典。
到目前为止,只有在我预先在函数之外创建一个hohodler字典,然后在获取结果时更新该列表,我才能获取结果字典。
下面是可以工作并且几乎满足我需要的示例,我只是怀疑它不是一个好的代码,使用起来也不像我想要的那么简单:
import asyncio
import aioodbc
import time
import pandas as pd
cstr = """DRIVER=my_hive_driver;
HOST=my_host;
PORT=10000;
AuthMech=1;
"""
async def _async_query(query,cstr,output_store = None,query_key = None):
loop = asyncio.get_event_loop()
print(time.ctime(time.time()),' >>> running query: ',query)
# establisth connection
conn = await aioodbc.connect(dsn = cstr,loop = loop,autocommit = True)
# get a cursor
cur = await conn.cursor()
# execute query
print(time.ctime(time.time()),' >>> hive execution started: ',query)
await cur.execute(query)
print(time.ctime(time.time()),' >>> fetching data: ',query)
# if query returns results,turn them to dataframe
if cur.description is not None:
rows = await cur.fetchall()
print(time.ctime(time.time()),' >>> data fetched: ',query)
out = pd.DataFrame((tuple(i) for i in rows),columns = [metadata[0] for metadata in cur.description])
# store dataframe and query to a placeholder dictionary
if output_store is not None and query_key is not None:
output_store.update({query_key:{"query":query,"result":out}})
print(out.tail())
#print(rows)
# close connection
await cur.close()
await conn.close()
print(time.ctime(time.time()),' >>> finished query: ',query)
如果我只想执行一个查询,我可以简单地做到:
loop = asyncio.get_event_loop()
placeholder = {}
loop.run_until_complete(_async_query(query = "select * from my_schema.my_table limit 10",cstr = cstr,output_store = placeholder,query_key = 1))
将运行查询并将数据作为数据框添加到placholder字典中。
我真正想做的是执行查询列表。因此,我做了一个助手函数,该函数针对列表中的每个查询调用了上述函数:
def async_query(query_list,output_store =None):
loop = asyncio.get_event_loop()
to_run = [_async_query(query = i,query_key = h,output_store = output_store) for h,i in enumerate(query_list)]
cors = asyncio.wait(to_run)
#results = await asyncio.gather(*to_run)
output_store = loop.run_until_complete(asyncio.gather(*cors))
#return await results
loop.close()
然后我可以像下面这样使用此功能:
queries = ["select * from my_schema.my_table limit 10;","select * from my_schema.my_table limit 20;","select * from my_schema.my_table limit 40;"]
placeholder = {}
async_query(query_list = queries,output_store = placeholder)
然后,占位符字典将存储我的每个查询和所需的结果。
我的问题是:我应该如何更改代码,以便可以在不将空的hohodler传递给函数的情况下返回字典或结果数据帧列表?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)