Python在atexit._run_exitfuncs中的并发错误:OSError:仅在Visual Studio调试模式下运行的句柄是关闭的

问题描述

我试图实现并发功能。未来一直存在问题,我生成一个在以下位置执行的代码

  • Python终端

  • Enters in some infinite loop in my last loop and do not finish the code,this code below do finish,however reproduces the error thgat i receive in the Debug Mode below
    
  • 所以当我在Visual Studio代码调试模式下运行它

  • 生成以下错误

    Error in atexit._run_exitfuncs:
    Traceback (most recent call last):
      File "C:\Users\debor\anaconda3\envs\multimidia_image\lib\concurrent\futures\process.py",line 102,in _python_exit
        thread_wakeup.wakeup()
      File "C:\Users\debor\anaconda3\envs\multimidia_image\lib\concurrent\futures\process.py",line 90,in wakeup
        self._writer.send_bytes(b"")
      File "C:\Users\debor\anaconda3\envs\multimidia_image\lib\multiprocessing\connection.py",line 183,in send_bytes
        self._check_closed()
      File "C:\Users\debor\anaconda3\envs\multimidia_image\lib\multiprocessing\connection.py",line 136,in _check_closed
        raise OSError("handle is closed")
    OSError: handle is closed
    

代码在调试模式下重现该错误

import numpy as np
import pandas as pd
import concurrent.futures
import multiprocessing
from itertools import product



def main_execucao(n_exec,input_n_geracoes_max,tipo_apt,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words):

    # Dicionário vazio para armazenar resultados por geração n+1
    resultados = {}
    resultados[("teste")] = [1,0]
    return resultados


def main(): 

    # Parametros problemáticos Process
    tmi = ["tm1"]
    si = ["s1"]
    ci = ["c1"]
    ri = ["r2"]

    # Lista de variantes
    variantes = list(product(*[tmi,si,ci,ri]))

    # Numero de execuções
    input_n_execucoes = 3
    

    # Inputs para main

    # Inputs gerais

    # Lista iterável para chamar função
    lista_n_exec=range(0,input_n_execucoes)

    # Numero de gerações
    input_n_geracoes_max =  [50] * len(lista_n_exec)

    # Palavras
    w_1=np.flip(np.array(list("send")),0)
    w_2=np.flip(np.array(list("more")),0)
    w_3=np.flip(np.array(list("money")),0)
    words=[w_1,w_2,w_3]
    words_lista=[words]* len(lista_n_exec)

    for param in variantes:

        # Inputs parameters
    
        # Tipo aptidao (apt_1= aptidão simples,apt_2 aptidão invertida,apt_3 invertida normalizada pior valor)
        tipo_apt = "apt_1"
        tipo_apt_lista = [tipo_apt] * len(lista_n_exec)

        # População inicial

        input_pop_0 = [100] * len(lista_n_exec)

        tipo_pop_gerar = ["sem_repeticao_populacao_inicial"] * len(lista_n_exec)


        nome_selecao_cross  =param[1]
        tipo_selecao_crossover = [nome_selecao_cross] * len(lista_n_exec)
        tour = [3] * len(lista_n_exec)# Apenas se seleção for torneio 

        # Tipo de crossover

        nome_cross = param[2]
        tipo_crossover = [nome_cross] * len(lista_n_exec)
        # % de Crossover
        input_p_cross = [0.8] * len(lista_n_exec)

        # % de mutação

        nome_mutacao = param[0]
        if nome_mutacao=="tm1":
            perc_mutacao=0.02
        elif nome_mutacao=="tm2":
            perc_mutacao=0.10
        elif nome_mutacao=="tm3":
            perc_mutacao=0.2

        tipo_mutacao = [perc_mutacao] * len(lista_n_exec)

        # Tipo de reinserção

        nome_reinsercao = param[3]
        tipo_reinsercao = [nome_reinsercao] * len(lista_n_exec)

        # Finaliza inputs

        # Index contar número de execuções e gerações
        ix_exec_real=0
        ger_exec_real=0

        # Número total de convergencias após todas execuções 
        total_conv_tds_exec=0

        # Dicionário para armazenar resultados de todas as n execuções
        results={}

        with concurrent.futures.ProcesspoolExecutor() as executor:

            for result in (executor.map(main_execucao,lista_n_exec,tipo_apt_lista,words_lista)):
                results.update(result)
            
            print("finish")

if __name__ == '__main__':
    multiprocessing.freeze_support()
    main()

有人知道我可以尝试什么吗? 我发现了类似的问题: https://github.com/getsentry/sentry-python/issues/423

非常感谢

解决方法

通过更改两行从使用 MessageContext 类切换到使用 concurrent.futures.ProcessPoolExecutor 类来查看问题是否消失:

更改自:

multiprocess.pool.Pool

致:

        with concurrent.futures.ProcessPoolExecutor() as executor:

            for result in (executor.map(main_execucao,lista_n_exec,input_n_geracoes_max,tipo_apt_lista,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words_lista)):

当然,您现在也可以删除 with multiprocessing.Pool() as executor: for result in (executor.starmap(main_execucao,zip(lista_n_exec,words_lista))): 语句。

,

每个#434

Python 标准库—— concurrent.futures 使用线程来管理 任务队列,因此当哨兵修补线程的启动方法时,它 将导致引用循环并阻止gc收集。

所以在 concurrent.futures.process 中,_queue_management_thread 中的 _threads_wakeups 将被丢弃,直到运行完整的 gc 集合,否则,python 退出时将引发异常,因为实际 管理线程已经停止。

可以在那里找到修复程序,允许将其手动输入到他们的 Python 3.8 而不是升级到 Python 3.9。