Python Prefect 上的 MySQL 连接

问题描述

我正在尝试创建一个接收 PyMysqL 连接实例作为输入的 Prefect 任务,例如:

@task
def connect_db():
    connection = pyMysqL.connect(user=user,password=password,host=host,port=port,db=db,connect_timeout=5,cursorclass=pyMysqL.cursors.DictCursor,local_infile=True)
    return connection


@task
def query_db(connection) -> Any:
    query = 'SELECT * FROM myschema.mytable;'
    with connection.cursor() as cur:
        cur.execute(query)
        rows = cur.fetchall()
    return rows


@task
def get_df(rows) -> Any:
    return pd.DataFrame(rows,dtype=str)


@task
def save_csv(df):
    path = 'mypath'
    df.to_csv(path,sep=';',index=False)


with Flow(FLOW_NAME) as f:
    con = connect_db()
    rows = query_db(con)
    df = get_df(rows)
    save_csv(df)

但是,当我尝试注册结果流时,它引发了“TypeError: cannot pickle 'socket' object”。通过 Prefect 的文档,我发现了内置的 MysqL 任务 (https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute),但是它们每次被调用时都会打开和关闭连接。有什么办法可以将先前打开的连接传递给 Prefect Task(或实现诸如连接管理器之类的东西)?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)