python函数可将查询列表异步发送到数据库并获取结果

问题描述

我正在尝试使用aioodbc同时发送多个查询到配置单元,并将这些查询的结果作为列表返回。我想要一个函数,将查询列表作为输入,异步将其发送到hive,并返回python数据帧的列表或字典。

到目前为止,只有在我预先在函数之外创建一个hohodler字典,然后在获取结果时更新该列表,我才能获取结果字典。

下面是可以工作并且几乎满足我需要的示例,我只是怀疑它不是一个好的代码,使用起来也不像我想要的那么简单:

import asyncio
import aioodbc
import time
import pandas as pd

cstr = """DRIVER=my_hive_driver;
HOST=my_host;
PORT=10000;
AuthMech=1;
"""


async def _async_query(query,cstr,output_store = None,query_key = None):
    loop = asyncio.get_event_loop()
    print(time.ctime(time.time()),' >>> running query: ',query)
    # establisth connection
    conn = await aioodbc.connect(dsn = cstr,loop = loop,autocommit = True)
    # get a cursor
    cur = await conn.cursor()
    # execute query
    print(time.ctime(time.time()),' >>> hive execution started: ',query)
    await cur.execute(query)
    print(time.ctime(time.time()),' >>> fetching data: ',query)
    # if query returns results,turn them to dataframe
    if cur.description is not None:
        rows = await cur.fetchall()
        print(time.ctime(time.time()),' >>> data fetched: ',query)
        out = pd.DataFrame((tuple(i) for i in rows),columns = [metadata[0] for metadata in cur.description])
        # store dataframe and query to a placeholder dictionary
        if output_store is not None and query_key is not None:
            output_store.update({query_key:{"query":query,"result":out}})
        print(out.tail())
        #print(rows)
    # close connection
    await cur.close()
    await conn.close()
    print(time.ctime(time.time()),' >>> finished query: ',query)

如果我只想执行一个查询,我可以简单地做到:

loop = asyncio.get_event_loop()
placeholder = {}

loop.run_until_complete(_async_query(query = "select * from my_schema.my_table limit 10",cstr = cstr,output_store = placeholder,query_key = 1))

将运行查询并将数据作为数据框添加到placholder字典中。

我真正想做的是执行查询列表。因此,我做了一个助手函数,该函数针对列表中的每个查询调用了上述函数:

def async_query(query_list,output_store =None):
    loop = asyncio.get_event_loop()
    to_run = [_async_query(query = i,query_key = h,output_store = output_store) for h,i in enumerate(query_list)]
    cors = asyncio.wait(to_run)
    #results = await asyncio.gather(*to_run)
    output_store = loop.run_until_complete(asyncio.gather(*cors))
    #return await results
    loop.close()

然后我可以像下面这样使用此功能:

queries = ["select * from my_schema.my_table limit 10;","select * from my_schema.my_table limit 20;","select * from my_schema.my_table limit 40;"]
placeholder = {}
async_query(query_list = queries,output_store = placeholder)

然后,占位符字典将存储我的每个查询和所需的结果。

我的问题是:我应该如何更改代码,以便可以在不将空的hohodler传递给函数的情况下返回字典或结果数据帧列表?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...