问题描述
这是我的代码(基于此 answer from @Mike McKerns):
from pathos.multiprocessing import ProcessingPool as Pool
class dispatcher():
def __init__(self,controller):
self.controller = controller
self.sf = self.controller.sf
self.jobs = []
self.completed_jobs = []
self.containers = []
def add_job(self,container):
self.containers.append(container)
self.jobs.append(Job(**container.job_options))
def submit_current_jobs(self):
if self.jobs:
args = []
for i,job in enumerate(self.jobs):
args.append([job.source,job.sql,i])
self.completed_jobs = Pool().map(self.submit_to_sNowflake,args)
else:
with self.controller.output:
print('Cannot submit the current jobs,none have been created!')
def submit_to_sNowflake(self,job_source,job_sql,job_index):
print(f"src={job_source},idx={job_index},sql len={len(job_sql)}")
self.containers[job_index].df = self.sf.run(self.controller.query_ui.output,job_sql) if job_source == 'query' else self.sf.run(self.controller.upload_ui.widgets['log'],job_sql)
class Job():
def __init__(self,**options):
self.sql = options['sql']
self.source = 'query' if 'project' in options else 'upload'
self.timestamp = datetime.datetime.Now().strftime("%Y%m%d-%H%M%s")
if self.source == 'query':
self.project = options['project']
self.query = options['query']
self.events = options['events']
else:
self.path = options['path']
以下是传递给 submit_to_sNowflake
函数的对象类型:
<class 'str'> <class 'str'> <class 'int'>
我已验证 args
是包含另一个长度为 3 的 list
的 list
,指定 job_source
、job_sql
和 job_index
。
self.dispatcher = dispatcher.dispatcher(self)
self.dispatcher.add_job(self.container)
self.dispatcher.submit_current_jobs()
错误是:
~\.conda\envs\nemawashi\lib\site-packages\dill\_dill.py in save_module_dict(pickler,obj)
939 # we only care about session the first pass thru
940 pickler._session = False
--> 941 StockPickler.save_dict(pickler,obj)
942 log.info("# D2")
943 return
~\.conda\envs\nemawashi\lib\pickle.py in save_dict(self,obj)
969
970 self.memoize(obj)
--> 971 self._batch_setitems(obj.items())
972
973 dispatch[dict] = save_dict
~\.conda\envs\nemawashi\lib\pickle.py in _batch_setitems(self,items)
995 for k,v in tmp:
996 save(k)
--> 997 save(v)
998 write(SETITEMS)
999 elif n:
~\.conda\envs\nemawashi\lib\pickle.py in save(self,obj,save_persistent_id)
576 reduce = getattr(obj,"__reduce_ex__",None)
577 if reduce is not None:
--> 578 rv = reduce(self.proto)
579 else:
580 reduce = getattr(obj,"__reduce__",None)
TypeError: cannot pickle '_hashlib.HASH' object
这是因为我正在调用 self.sf.run
函数内的另一个函数 (self.submit_to_sNowflake
) 吗?
谢谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)