在多处理中发生错误时停止进程

问题描述

我在 python 中创建了 3 个进程。我附上了一个代码。 现在我想停止运行p2,p3进程的执行,因为我由于p1进程而出错。我有想法添加p2.terminate(),我不知道在这种情况下添加到哪里。提前致谢。

def table(a):
     try:
        for i in range(100):
            print(i,'x',a,'=',a*i)
     except:
        print("error")

processes = []
p1= multiprocessing.Process(target = table,args=['s'])
p2= multiprocessing.Process(target = table,args=[5])
p3= multiprocessing.Process(target = table,args=[2])
p1.start()
p2.start()
p3.start()

processes.append(p1)
processes.append(p2)
processes.append(p3)

for process in processes:
    process.join()```


解决方法

要在某个进程因错误而终止时停止任何给定进程,首先将目标 table() 设置为以适当的 exitcode > 0 退出

def table(args):
    try:
        for i in range(100):
            print(i,'x',a,'=',a*i)
     except:
        sys.exit(1)
    sys.exit(0)

然后您可以启动您的进程并轮询这些进程以查看是否有任何进程已终止。

#!/usr/bin/env python3
# coding: utf-8

import multiprocessing
import time
import logging
import sys

logging.basicConfig(level=logging.INFO,format='[%(asctime)-15s] [%(processName)-10s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S')


def table(args):
    try:
        for i in range(5):
            logging.info('{} x {} = {}'.format(i,args,i*args))
            if isinstance(args,str):
                raise ValueError()
            time.sleep(5)
    except:
        logging.error('Done in Error Path: {}'.format(args))
        sys.exit(1)
    logging.info('Done in Success Path: {}'.format(args))
    sys.exit(0)


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=table,args=('s',))
    p2 = multiprocessing.Process(target=table,args=(5,))
    p3 = multiprocessing.Process(target=table,args=(2,))
    processes = [p1,p2,p3]
    for process in processes:
        process.start()

    while True:
        failed = []
        completed = []
        for process in processes:
            if process.exitcode is not None and process.exitcode != 0:
                failed.append(process)
        if failed:
            for process in processes:
                if process not in failed:
                    logging.info('Terminating Process: {}'.format(process))
                    process.terminate()
            break
        if len(completed) == len(processes):
            break
        time.sleep(1)

本质上,您正在使用 terminate() 来停止仍在运行的剩余进程。

,

首先,我修改了函数 table 以抛出当传递给它的参数为“s”时捕获的异常,否则在打印之前延迟 0.1 秒以给出主进程有机会通过异常意识到子进程并且可以在其他进程开始打印之前取消它们。否则,其他进程将在您取消之前完成。这里我使用了一个进程池,它支持 terminate 方法,可以方便地终止所有提交的、未完成的任务,而不必单独取消每个任务(尽管这也是一种选择)。 >

代码创建了一个大小为 3 的多处理池,因为这是提交的“任务”的数量,然后使用方法 apply_async 提交 3 个任务以并行运行(假设您至少有 3 个处理器) . apply_sync 返回一个 AsyncResult 实例,可以调用其 get 方法等待提交的任务完成并从工作函数 table 中获取返回值,即None 用于提交的第二个和第三个任务不感兴趣,或者如果工作函数有未捕获的异常会抛出异常,这是提交的第一个任务的情况:

import multiprocessing
import time

def table(a):
    if a == 's':
        raise Exception('I am "s"')
    time.sleep(.1)
    for i in range(100):
        print(i,a*i)

# required for Windows:
if __name__ == '__main__':
    pool = multiprocessing.Pool(3) # create a pool of 3 processes
    result1 = pool.apply_async(table,))
    result2 = pool.apply_async(table,))
    result3 = pool.apply_async(table,))
    try:
        result1.get() # wait for completion of first task
    except Exception as e:
        print(e)
        pool.terminate() # kill all processes in the pool
    else:
        # wait for all submitted tasks to complete:
        pool.close()
        pool.join()
        """
        # or alternatively:
        result2.get() # wait for second task to finish
        result3.get() # wait for third task to finish
        """

打印:

I am "s"