在Python中将Pygame与并行性结合使用 “将pygame用作从同一进程衍生的不同线程中的多个不同实例的一种方式” “一种与pygame的显示安全并发并更新时钟的方法” “一种使用multiprocessing.Process的方法,它无需腌制方法的类,但仍可以访问类变量” “具有以下功能的多处理库:” 1 不需要腌制lambda函数或者可以 您需要使用序列化Python对象才能在进程之间发送它们 2 有一种方法可以告诉子流程正在使用哪个流程工作者

问题描述

我正在尝试训练神经网络来玩使用Pygame制作的SMB1游戏。为了加快速度,我想使用并行处理,以便由不同人群(以及不同的训练数据)一次玩多个游戏副本。

我的问题的根源在于Pygame本质上不是基于实例的。也就是说,它将只生成一个带有一个显示对象的窗口。因为我不能为每个进程创建多个Pygame窗口并显示对象,所以这些进程必须共享一个显示对象。这引出了我的第一个问题:是否有一种方法可以拥有多个pygame实例,如果没有,是否可以同时使用一种(性能轻量)方法绘制到显示器上?也就是每个游戏都绘制到整个窗口的不同部分。

但是,我并不是真的需要渲染所有游戏。我只关心至少渲染了一个游戏实例,以便可以监视其进度。然后,我的解决方案是为每个游戏分配一个进程ID,并且只有进程ID为0的游戏才会真正显示在显示器上。并发问题解决了! 为此,我使用了multiprocessing.Process:

processes = [];
genome_batches = np.array_split(genomes,self.runConfig.parallel_processes);
for i in range(runConfig.parallel_processes):
    process = multiprocessing.Process(target=self.eval_genome_batch_feedforward,args=(genome_batches[i],config,i));
    processes.append(process);
for process in processes:
    process.start();
for process in processes:
    process.join();

但是,这在多处理使对象腌制时引起了自己的问题: AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>' 注意:config和RunnerConfig是两件事。一个来自我正在使用的简洁库,这是传递给函数的配置,另一个是我自己的类,这是进程从其启动的类的属性。

经过一些研究,似乎因为我使用的是类方法,所以多处理使该类腌制,该类包括上面的RunnerConfig,其中包含不能腌制的lambda函数。这很难解决,因为这些lambda函数是在 self.eval_genome_batch中专门使用的。这就引出了第二个问题:是否有可能使用multiprocessing.Process而不需要对外部类进行酸洗,从而不会对lambda函数进行酸洗?

最后,经过更多研究,我发现我可以使用pathos.multiprocessing而不是使用腌制的多重处理,而使用莳萝。莳萝可以腌制lambda函数。哇!

但是没有。最后有一个家伙。 Pathos.multiprocessing仅具有来自multiprocessing的.map和.map等效函数,这无法让我自己控制进程。这意味着在调用该函数时,无法(afaik)告诉程序正在运行游戏的哪个进程ID,以及是否将其渲染到屏幕上。所以最后一个问题是:有没有一种方法可以使用pathos.multiprocessing.map(或者,实际上是任何库并行函数),即a)不破坏lambda函数,并且b)可以告诉被调用的函数正在使用进程ID?

最后一点:我知道最简单的答案就是不渲染给Pygame。这将解决所有问题。但是,能够看到程序的进展和学习对我来说是非常有用和重要的。

因此,我列出了不同的问题,如果解决了其中的任何一个,便可以解决所有问题:

  • 一种将pygame用作在同一进程中衍生的不同线程中的多个不同实例的方法
  • 一种与pygame的显示安全并发工作的方式(并更新时钟)
  • 一种使用multiprocessing.Process的方法,它无需使方法的类腌制,但仍可以访问类变量
  • 一个多处理库,该库:
    • 不需要腌制lambda函数或者可以
    • 有一种方法可以告诉子流程正在使用哪个流程工作者

编辑:这是大多数相关代码。因为类很长,所以只包括相关的方法。如果您愿意,可以在my github

中找到源代码。

game_runner_neat.py:开始并行处理的类

import neat
import baseGame
import runnerConfiguration
import os.path
import os
import visualize
import random
import numpy as np
#import concurrent.futures
import multiprocessing
from logReporting import LoggingReporter
from renderer import Renderer as RendererReporter
from videofig import videofig as vidfig
from neat.six_util import iteritems,itervalues

class GameRunner:

    #if using default version,create basic runner and specify game to run
    def __init__(self,game,runnerConfig):
        self.game = game;
        self.runConfig = runnerConfig;

    #skip some code


    #parallel version of eval_genomes_feedforward
    def eval_genome_batch_feedforward(self,genomes,processNum):
        for genome_id,genome in genomes:
            genome.fitness += self.eval_genome_feedforward(genome,processNum=processNum);

    
    def eval_training_data_batch_feedforward(self,data,processNum,lock):
        for datum in data:
            for genome_id,genome in genomes:
                genome.increment_fitness(lock,self.eval_genome_feedforward(genome,processNum=processNum,trainingDatum=datum)); #increment_fitness allows multiple threads to change the fitness of the same genome safely

    #evaluate a population with the game as a feedforward neural net
    def eval_genomes_feedforward(self,config):
        for genome_id,genome in genomes:
            genome.fitness = 0; #sanity check
        if (self.runConfig.training_data is None):
            if (self.runConfig.parallel):
                processes = [];
                genome_batches = np.array_split(genomes,self.runConfig.parallel_processes);
                for i in range(runConfig.parallel_processes):
                    process = multiprocessing.Process(target=self.eval_genome_batch_feedforward,i));
                    processes.append(process);
                for process in processes:
                    process.start();
                for process in processes:
                    process.join();
                return;
            else:
                for genome_id,genome in genomes:
                    genome.fitness += self.eval_genome_feedforward(genome,config)
        else:
            if (self.runConfig.parallel):
                processes = [];
                data_batches = np.array_split(self.runConfig.training_data,self.runConfig.parallel_processes);
                lock = multiprocessing.Lock();
                for i in range(self.runConfig.parallel_processes):
                    process = multiprocessing.Process(target=self.eval_training_data_batch_feedforward,args=(genomes,data_batches[i],i,lock));
                    processes.append(process);
                    process.start();
                for process in processes:
                    process.join();
                return;
            else:
                for datum in self.runConfig.training_data:
                    for genome_id,genome in genomes:
                        genome.fitness += self.eval_genome_feedforward(genome,trainingDatum=datum)

runnerConfiguration.py(带有lambda函数的类,已通过 init 传递给GameRunner):

class RunnerConfig:

    def __init__(self,gameFitnessFunction,gameRunningFunction,logging=False,logPath='',recurrent=False,trial_fitness_aggregation='average',custom_fitness_aggregation=None,time_step=0.05,num_trials=10,parallel=False,returnData=[],gameName='game',num_generations=300,fitness_collection_type=None):

        self.logging = logging;
        self.logPath = logPath;
        self.generations = num_generations;
        self.recurrent = recurrent;
        self.gameName = gameName;
        self.parallel = parallel;
        self.time_step = time_step;
        self.numTrials = num_trials;
        self.fitnessFromGameData = gameFitnessFunction;
        self.gameStillRunning = gameRunningFunction;
        self.fitness_collection_type = fitness_collection_type;

        self.returnData = returnData;
##        for (datum in returnData):
##            if (isinstance(datum,IOData)):
##                [returnData.append(x) for x in datum.getSplitData()];
##            else:
##                returnData.append(datum);
##        
        if (trial_fitness_aggregation == 'custom'):
            self.fitnessFromArray = custom_fitness_aggregation;

        if (trial_fitness_aggregation == 'average'):
            self.fitnessFromArray = lambda fitnesses : sum(fitnesses)/len(fitnesses);

        if (trial_fitness_aggregation == 'max'):
            self.fitnessFromArray = lambda fitnesses : max(fitnesses);

        if (trial_fitness_aggregation == 'min'):
            self.fitnessFromArray = lambda fitnesses : min(fitnesses);

gameFitnessFunction和gameRunningFunction是用于自定义训练行为的函数。

当程序尝试使用RunnerConfig.parallel = True运行eval_genomes_feedforward时,我收到以下完整错误消息:

Traceback (most recent call last):
  File "c:/Users/harrison_truscott/Documents/GitHub/AI_game_router/Neat/smb1Py_runner.py",line 94,in <module>
    winner = runner.run(config,'run_' + str(currentRun));
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py",line 75,in run
    winner = pop.run(self.eval_genomes,self.runConfig.generations);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\neat\population.py",line 102,in run
    fitness_function(list(iteritems(self.population)),self.config)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py",line 204,in eval_genomes
    self.eval_genomes_feedforward(genomes,config);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py",line 276,in eval_genomes_feedforward
    process.start();
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\process.py",line 121,in start
    self._popen = self._Popen(self)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py",line 224,in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\context.py",line 326,in _Popen
    return Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\popen_spawn_win32.py",line 93,in __init__
    reduction.dump(process_obj,to_child)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\reduction.py",line 60,in dump
    ForkingPickler(file,protocol).dump(obj)
AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>'

当第一个进程中断时,当下一个进程由于第一个进程的不完整启动而中断时,我又收到第二条错误消息:

Traceback (most recent call last):
  File "<string>",line 1,in <module>
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py",line 116,in spawn_main
    exitcode = _main(fd,parent_sentinel)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py",line 125,in _main
    prepare(preparation_data)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py",line 236,in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py",line 287,in _fixup_main_from_path
    main_content = runpy.run_path(main_path,File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\runpy.py",line 263,in run_path
    return _run_module_code(code,init_globals,run_name,line 96,in _run_module_code
    _run_code(code,mod_globals,line 86,in _run_code
    exec(code,run_globals)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\smb1Py_runner.py",line 45,in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py",line 154,in get_preparation_data
    _check_not_importing_main()
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\Python38\lib\multiprocessing\spawn.py",line 134,in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

顺便说一句,multiprocessing.freeze_support()是我在正在运行的主文件中调用的第一个函数。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)