问题描述
我正在尝试创建一个接收 PyMysqL 连接实例作为输入的 Prefect 任务,例如:
@task
def connect_db():
connection = pyMysqL.connect(user=user,password=password,host=host,port=port,db=db,connect_timeout=5,cursorclass=pyMysqL.cursors.DictCursor,local_infile=True)
return connection
@task
def query_db(connection) -> Any:
query = 'SELECT * FROM myschema.mytable;'
with connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
return rows
@task
def get_df(rows) -> Any:
return pd.DataFrame(rows,dtype=str)
@task
def save_csv(df):
path = 'mypath'
df.to_csv(path,sep=';',index=False)
with Flow(FLOW_NAME) as f:
con = connect_db()
rows = query_db(con)
df = get_df(rows)
save_csv(df)
但是,当我尝试注册结果流时,它引发了“TypeError: cannot pickle 'socket' object”。通过 Prefect 的文档,我发现了内置的 MysqL 任务 (https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute),但是它们每次被调用时都会打开和关闭连接。有什么办法可以将先前打开的连接传递给 Prefect Task(或实现诸如连接管理器之类的东西)?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)