遗传算法——在具有未绑定方法的类中使用多处理

问题描述

我最近一直在尝试通过使用未绑定方法(基本上,它是用户提供的函数)在类本身内部使用 multiprocessing 库来并行化我的一些代码(为了速度)保存在类属性内)。它根本不起作用。

上下文:我正在尝试并行化一个“并行遗传算法”类,顾名思义,它本身就是一个令人尴尬的并行问题。

据我所知,在我的代码中有两个问题。 (1) 用户提供的适应度函数不会通过 Pool 对象生成的进程导出,并且似乎没有多少深度复制能够解决它。 (2) 另一个问题是,也许 Pool 对象不确定如何处理多维输出......我真的不确定这个 tbh。

为了让事情更清晰,我已经尝试编写出一个很小的独立版本的代码(它目前没有运行,因为存在错误,这是重点):

from itertools import repeat
import multiprocessing as mp
import numpy as np

class geneticAlgorithm:
    def __init__(self,I,G,D,U,fitness_function,run_parallel):
        self.fitness_function = fitness_function # User-supplied fitness function
        self.D = D # Problem size (number of genes)
        self.I = I # Population size (number of individuals)
        self.G = G # Number of generations
        self.U = U # Number of parallel populations
        self.run_parallel = run_parallel

    def sga(self):
        '''One single-threaded genetic algorithm'''
        # Activation rate is fixed at 0.5 for the sake of this MWE
        pop = np.random.binomial(size=self.I * self.D,n=1,p=0.5).reshape(self.I,self.D) # Seed population

        for g in range(self.G):
            # fitness is computed for all individuals
            fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
            # fitness is scaled back to 100%
            fitpop /= np.sum(fitpop)
            # 2I parents are selected at random according to each individual's relative fitness score
            parents = np.random.choice(range(self.I),size=2 * self.I,replace=True,p=fitpop).reshape(self.I,2)
            # Parents are crossed 2 by 2,with each pair producing exactly one offspring
            crossover = np.array([np.random.choice(parents[i,:],size=self.D,replace=True) for i in range(self.I)]).reshape(self.I,self.D)
            embryos = np.array([[pop[crossover[i,d],d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I,self.D)
            # Mutation rate is fixed at 1/D for the sake of this MWE
            mutations = np.random.binomial(size=self.I * self.D,p=1 / self.D).reshape(self.I,self.D)
            # "Mutated embryos" become fully fledged individuals and replace the parent generation
            pop = (1 - mutations) * embryos + mutations * (1 - embryos)

        # Individuals are aggregated gene-wise,with the average of active and inactive genes creating a ratio
        return pop.mean(axis=0)

    def pga(self):
        '''Multiple parallel genetic algorithms'''
        if self.run_parallel:
            p = mp.Pool(mp.cpu_count())
            universes = p.starmap(geneticAlgorithm.sga,zip(repeat(self,self.U)))
            p.close()
            p.join()
        else:
            universes = np.zeros(self.U * self.D).reshape(self.U,self.D)
            
            for u in range(self.U):
                universes[u,:] = self.sga()

        # Multiple GAs are aggregated in a sort of "mean of means"
        return universes.mean(axis=0)

if __name__ == '__main__':
    def my_fitness_function(ind):
        '''Dummy fitness function,scores all individual equally...'''
        return 1.0

    # Dummy test to check if the code runs... it doesn't :(
    ga = geneticAlgorithm(I=10,G=3,D=5,U=10,fitness_function=my_fitness_function,run_parallel=True)
    print(ga.pga())

任何类型的提示代码或完整的解决方案都将受到欢迎。这曾经在 R 中相当容易,但是对于 Python,我显然不知所措......谢谢!

ETA:修正了代码中的一些拼写错误,并添加run_parallel 参数以表明它在没有并行化的情况下运行得非常好。哦,是的,另外,我在 Windows 上运行,否则我可能会尝试使用 Ray 库,我听说它可以创造奇迹,尤其是与 multiprocessing 相比时。

解决方法

查看我对您帖子的评论。此外,池中的每个进程都应该唯一地为随机数生成器提供种子。否则,它们将生成相同的随机数序列。

请注意,由于创建进程池和将参数和结果从一个进程的地址空间传递到另一个地址空间的开销,多处理不一定运行得更快。 您的工作函数 sga 必须具有足够的 CPU 密集度,才能使多处理具有优势,我现在明白这实际上是

from itertools import repeat
import multiprocessing as mp
import numpy as np

# this will be executed by each process in the pool:
def init_pool():
    from threading import current_thread
    ident = current_thread().ident
    np.random.seed(ident)

def my_fitness_function(ind):
    '''Dummy fitness function,scores all individual equally...'''
    return 1.0

class GeneticAlgorithm:
    def __init__(self,I,G,D,U,fitness_function,run_parallel):
        self.fitness_function = fitness_function # User-supplied fitness function
        self.D = D # Problem size (number of genes)
        self.I = I # Population size (number of individuals)
        self.G = G # Number of generations
        self.U = U # Number of parallel populations
        self.run_parallel = run_parallel

    def sga(self):
        '''One single-threaded genetic algorithm'''
        # Activation rate is fixed at 0.5 for the sake of this MWE
        pop = np.random.binomial(size=self.I * self.D,n=1,p=0.5).reshape(self.I,self.D) # Seed population

        for g in range(self.G):
            # fitness is computed for all individuals
            fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
            # fitness is scaled back to 100%
            fitpop /= np.sum(fitpop)
            # 2I parents are selected at random according to each individual's relative fitness score
            parents = np.random.choice(range(self.I),size=2 * self.I,replace=True,p=fitpop).reshape(self.I,2)
            # Parents are crossed 2 by 2,with each pair producing exactly one offspring
            crossover = np.array([np.random.choice(parents[i,:],size=self.D,replace=True) for i in range(self.I)]).reshape(self.I,self.D)
            embryos = np.array([[pop[crossover[i,d],d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I,self.D)
            # Mutation rate is fixed at 1/D for the sake of this MWE
            mutations = np.random.binomial(size=self.I * self.D,p=1 / self.D).reshape(self.I,self.D)
            # "Mutated embryos" become fully fledged individuals and replace the parent generation
            pop = (1 - mutations) * embryos + mutations * (1 - embryos)

        # Individuals are aggregated gene-wise,with the average of active and inactive genes creating a ratio
        return pop.mean(axis=0)

    def pga(self):
        '''Multiple parallel genetic algorithms'''
        universes = np.zeros(self.U * self.D).reshape(self.U,self.D)
        if self.run_parallel:
            pool_size = min(mp.cpu_count(),self.U)
            p = mp.Pool(pool_size,initializer=init_pool)
            results = p.starmap(GeneticAlgorithm.sga,zip(repeat(self,self.U)))
            p.close()
            p.join()
            for u in range(self.U):
                universes[u,:] = results[u]
        else:
            for u in range(self.U):
                universes[u,:] = self.sga()

        # Multiple GAs are aggregated in a sort of "mean of means"
        return universes.mean(axis=0)

if __name__ == '__main__':
    # Dummy test to check if the code runs... it doesn't :(
    ga = GeneticAlgorithm(I=10,G=3,D=5,U=10,fitness_function=my_fitness_function,run_parallel=True)
    print(ga.pga())

打印:

[0.56 0.46 0.38 0.54 0.52]