python pathos map函数返回''_hashlib.HASH'对象'

问题描述

这是我的代码(基于此 answer from @Mike McKerns):

from pathos.multiprocessing import ProcessingPool as Pool

class dispatcher():

    def __init__(self,controller):
        self.controller = controller
        self.sf = self.controller.sf
        self.jobs = []
        self.completed_jobs = []
        self.containers = []

    def add_job(self,container):
        self.containers.append(container)
        self.jobs.append(Job(**container.job_options))

    def submit_current_jobs(self):
        if self.jobs:
            args = []
            for i,job in enumerate(self.jobs):
                args.append([job.source,job.sql,i])
            self.completed_jobs = Pool().map(self.submit_to_sNowflake,args)
        else:
            with self.controller.output:
                print('Cannot submit the current jobs,none have been created!')

    def submit_to_sNowflake(self,job_source,job_sql,job_index):
        print(f"src={job_source},idx={job_index},sql len={len(job_sql)}")
        self.containers[job_index].df  = self.sf.run(self.controller.query_ui.output,job_sql) if job_source == 'query' else self.sf.run(self.controller.upload_ui.widgets['log'],job_sql)
        
class Job():

    def __init__(self,**options):
        self.sql = options['sql']
        self.source = 'query' if 'project' in options else 'upload'
        self.timestamp = datetime.datetime.Now().strftime("%Y%m%d-%H%M%s")
        if self.source == 'query':
            self.project = options['project']
            self.query = options['query']
            self.events = options['events']
        else:
            self.path = options['path']

以下是传递给 submit_to_sNowflake 函数的对象类型:

<class 'str'> <class 'str'> <class 'int'>

我已验证 args 是包含另一个长度为 3 的 listlist,指定 job_sourcejob_sqljob_index

调度器在用户界面类中实例化,获取错误代码序列为:

self.dispatcher = dispatcher.dispatcher(self)
self.dispatcher.add_job(self.container)   
self.dispatcher.submit_current_jobs()

错误是:

~\.conda\envs\nemawashi\lib\site-packages\dill\_dill.py in save_module_dict(pickler,obj)
    939             # we only care about session the first pass thru
    940             pickler._session = False
--> 941         StockPickler.save_dict(pickler,obj)
    942         log.info("# D2")
    943     return

~\.conda\envs\nemawashi\lib\pickle.py in save_dict(self,obj)
    969 
    970         self.memoize(obj)
--> 971         self._batch_setitems(obj.items())
    972 
    973     dispatch[dict] = save_dict

~\.conda\envs\nemawashi\lib\pickle.py in _batch_setitems(self,items)
    995                 for k,v in tmp:
    996                     save(k)
--> 997                     save(v)
    998                 write(SETITEMS)
    999             elif n:

~\.conda\envs\nemawashi\lib\pickle.py in save(self,obj,save_persistent_id)
    576                 reduce = getattr(obj,"__reduce_ex__",None)
    577                 if reduce is not None:
--> 578                     rv = reduce(self.proto)
    579                 else:
    580                     reduce = getattr(obj,"__reduce__",None)

TypeError: cannot pickle '_hashlib.HASH' object

这是因为我正在调用 self.sf.run 函数内的另一个函数 (self.submit_to_sNowflake) 吗?

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)