无法将动态类与parallel..futures.ProcessPoolExecutor一起使用

问题描述

在下面的代码中,我正在使用_py方法generate_object属性内动态创建该类的对象。

如果我不使用并发方法,则代码可以完美工作。但是,如果我使用concurrent.futures的并发性,则由于出现错误(除其他外),我无法获得预期的结果:

_pickle.PicklingError: Can't pickle <class '__main__.Script_0_1'>: attribute lookup Script_0_1 on __main__ Failed

搜索了此错误之后,我了解到只有可腌制的对象将作为参数传递给ProcesspoolExecutor.map(),所以我决定看看如何将动态类变为可腌制的。

问题是该问题的所有其他解决方案都以不同的方式创建动态对象(与我在_string_to_object()中使用的对象不同)。示例:12

我非常想使动态对象的创建保持现在的状态,因为我的很多真实代码都基于它,因此,我正在寻找一种适用于以下这种玩具代码的并发解决方案。

>

代码

import random
import codecs
import re
from concurrent.futures import ProcesspoolExecutor
import multiprocessing

class A:
    def __init__(self):
        self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ',{0},'and',{1})
'''
    
    def generate_text(self,name_1,name_2):
        py = self._py.format(name_1,name_2)
        py = codecs.decode(py,'unicode_escape')
        return py

    def generate_object(self,number_1,number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1,number_2))

    def _string_to_object(self,str_class,*args,**kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):",str_class).group(1).partition("(")[0]
        return locals()[class_name](*args,**kwargs)

from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1,2)
script.print_numbers()

print('Multiprocessing usage')
n_cores = 3
n_calls = 3

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

with ProcesspoolExecutor(max_workers=n_cores) as executor:
    args = ( (A().generate_object(i,i+1),A().generate_object(i+1,i+2)) for i in range(n_calls))
    results = executor.map(concurrent_function,args)

解决方法

我想不出一种方法来严格遵循您当前的方案在全局名称空间中创建Script类。但是:

由于每次调用方法generate_object时,您要在本地名称空间中创建一个新类并实例化该类的对象,为什么不将其工作推迟到进程池中进行呢?这还具有并行执行此类创建处理的额外优点,并且不需要酸洗。现在,我们将两个整数参数concurrent_functionnumber_1传递给number_2

import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor


class A:
    def __init__(self):
        self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ',{0},'and',{1})
'''

    def generate_text(self,name_1,name_2):
        py = self._py.format(name_1,name_2)
        py = codecs.decode(py,'unicode_escape')
        return py

    def generate_object(self,number_1,number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1,number_2))

    def _string_to_object(self,str_class,*args,**kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):",str_class).group(1).partition("(")[0]
        return locals()[class_name](*args,**kwargs)

"""
from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1,2)
script.print_numbers()
"""


def concurrent_function(args):
    for arg in args:
        obj = A().generate_object(arg[0],arg[1])
        obj.print_numbers()

def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 3

    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( ((i,i+1),(i+1,i+2)) for i in range(n_calls))
        # wait for completion of all tasks:
        results = list(executor.map(concurrent_function,args))

if __name__ == '__main__':
    main()

打印:

Multiprocessing usage
Numbers =  0 and 1
Numbers =  1 and 2
Numbers =  1 and 2
Numbers =  2 and 3
Numbers =  2 and 3
Numbers =  3 and 4

更有效的方式

无需使用exec。而是使用闭包:

from concurrent.futures import ProcessPoolExecutor

def make_print_function(number_1,number_2):
    def print_numbers():
        print(f'Numbers = {number_1} and {number_2}')

    return print_numbers



def concurrent_function(args):
    for arg in args:
        fn = make_print_function(arg[0],arg[1])
        fn()


def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 3

    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( ((i,args))

if __name__ == '__main__':
    main()

打印:

Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4

使用对象缓存来避免不必要地创建新对象

obj_cache = {} # each process will have its own

def concurrent_function(args):
    for arg in args:
        # was an object created with this set of arguments: (arg[0],arg[1])?
        obj = obj_cache.get(arg)
        if obj is None: # must create new object
            obj = A().generate_object(arg[0],arg[1])
            obj_cache[arg] = obj # save object for possible future use
        obj.print_numbers()
,

可能我找到了不需要exec()函数的方法。实现(带有注释)如下。

import codecs
from concurrent.futures import ProcessPoolExecutor

class A:
    def __init__(self):
        self.py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ',{1})
'''
    def generate_text(self,number_2):
        py = self.py.format(number_1,number_2)
        py = codecs.decode(py,number_2):
        class_code = self.generate_text(number_1,number_2)
        # Create file in disk
        with open("Script_" + str(number_1) + "_" + str(number_2) + ".py","w") as file:
            file.write(class_code)
        # Now import it and the class will now be (correctly) seen in __main__
        package = "Script_" + str(number_1) + "_" + str(number_2)
        class_name = "Script_" + str(number_1) + "_" + str(number_2)
        # This is the programmatically version of 
        # from <package> import <class_name>
        class_name = getattr(__import__(package,fromlist=[class_name]),class_name)
        return class_name()

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 2
    
    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( (A().generate_object(i,A().generate_object(i+2,i+3)) for i in range(n_calls))
        results = executor.map(concurrent_function,args)

if __name__ == '__main__':
    main()

基本上我在做什么,而不是动态分配类,而是将其写入文件。我这样做是因为问题的根源在于,当查看全局范围时,pickle无法正确定位嵌套类。现在,我以编程方式导入该类(将其保存到文件后)。

当然,此解决方案还具有必须处理文件的瓶颈,这也很昂贵。我没有测量处理文件或exec的速度是否更快,但是在我的实际情况下,我只需要综合类的一个对象(而不是所提供的玩具代码中的每个并行调用一个对象),因此文件选项最适合我。

还存在一个问题:使用n_calls = 15(例如)并执行多次后,似乎有时无法导入模块(刚刚创建的文件)。我尝试在以编程方式导入sleep()之前放一个Traceback (most recent call last): File "main.py",line 45,in <module> main() File "main.py",line 42,in main results = executor.map(concurrent_function,args) File "/usr/lib/python3.8/concurrent/futures/process.py",line 674,in map results = super().map(partial(_process_chunk,fn),File "/usr/lib/python3.8/concurrent/futures/_base.py",line 600,in map fs = [self.submit(fn,*args) for args in zip(*iterables)] File "/usr/lib/python3.8/concurrent/futures/_base.py",in <listcomp> fs = [self.submit(fn,*args) for args in zip(*iterables)] File "/usr/lib/python3.8/concurrent/futures/process.py",line 184,in _get_chunks chunk = tuple(itertools.islice(it,chunksize)) File "main.py",line 41,in <genexpr> args = ( (A().generate_object(i,i+3)) for i in range(n_calls)) File "main.py",line 26,in generate_object class_name = getattr(__import__(package,class_name) ModuleNotFoundError: No module named 'Script_13_14' ,但是它没有帮助。使用少量呼叫时似乎不会发生此问题,并且似乎也是随机发生的。错误堆栈的一部分示例如下所示:

(