一文了解如何在window上轻松实现多进程任务

参考

苏神写的parallel_apply link
python中multiprocessing文档 link

前言

在之前的工作中,我经常使用的多进程方式是进程池,在看了苏神写的parallel_apply之后,发现多进程使用队列的方式运行,从显示上(tqdm显示运行进度),灵活性上都更加好 经过更深入的使用之后,我发现其实进程池也可以比较灵活的传递参数。然而,在运行时发现多进程在windows上的支持很不友好,于是打算写一篇在windows上容易使用的多进程方法,当然也可以在linux上使用。

使用多进程来提高程序的运行效率是非常重要的,通常可以在数据处理,矩阵运算等方面应用。

进程在windows上使用的注意点

1.必须得保证你要使用多进程处理的程序是可序列化的,即可以转换为可存储或传输的形式,如generator就是不可序列化的形式,因此想要对jieba分词使用多进程就是不可行的
2.多个进程之间是独立的,所以想在多个进程中更新同一个类属性是不可行的,除非是使用multiprocessing中带的Array,List等方法,并将这些变量作为输入传入到进程中去,不过这个方法我并没有尝试过。
3.每个进程使用的函数都是一次性的,因此若是传入的类函数中,初始时赋值了变量,则这个变量在每一次运行时都会执行,若这个赋值变量的运行时间很长,则会导致使用多进程甚至比单独执行更慢,以下是例子:

import json
class myClass:
	def __init__(self):
		with open(file_name, 'r', encoding='utf-8') as f:
			self.data = json.load(f)
	
	def forward(self, idx):
		return self.data[idx]

上述例子中,初始化__init__函数中,需要载入数据来执行forward函数,这种情况下将foward放入多进程中,会极大的影响效率,甚至比单独执行更慢。
4.在if __name__ == '__main__'或者函数中设定的全局变量无法在多进程中使用,因为如果子进程中的代码尝试访问一个全局变量,它所看到的值(如果有)可能和父进程中执行 Process.start 那一刻的值不一样。例子如下:

import json
from multiprocessing import Pool
def forward(idx):
	return data[idx]

if __name__ == '__main__':
	global data
	with open(file_name, 'r', encoding='utf-8') as f:
		data = json.load(f)
	with Pool(6) as pool:
		pool.map(forward, [1,2,3,4,5])

而下面这种方式是可以的,因为全局变量是知识模块级别的变量

import json
from multiprocessing import Pool
with open(file_name, 'r', encoding='utf-8') as f:
	data = json.load(f)
def forward(idx):
	return data[idx]

if __name__ == '__main__':
	with Pool(6) as pool:
		pool.map(forward, [1,2,3,4,5])

5.在使用多进程时,应当使用if __name__ == '__main__',从而保护程序的入口点,不使用时可能会出现各种各样的错误。

方法

进程池

进程池是比较简单的方法,只需设定好池中进程的数量,使用map(function, args)即可使用,也可以使用starmap(function, args)的方法,map与starmap的区别在于map仅能传递一个参数,而starmap可以传递多个参数,例子如下

from multiprocessing import Pool
from tqdm import tqdm
def func(data):
    x = data[0]
    y = data[1]
    return x+y

def func1(x, y):
	return x+y

def simple_calculate():
	for data in tqdm(list(zip(*[range(10000000), \
        range(10000000)]))):
        func(data)

if __name__ == '__main__':
    with Pool(6) as pool:
        pool.map(func, tqdm(list(zip(*[range(10000000), \
        range(10000000)]))))
    with Pool(6) as pool:
    	pool.starmap(func1, tqdm(list(zip(*[range(10000000), \
        range(10000000)]))))
    simple_calculate()

map调用的是func,每次传入的参数为data, 而starmap每次传入的参数为x,y。使用进程池运行上述例子的时间大约是4s,而简单的运算simple_calculation为3s,显然在简单的数值运算上,多进程并没有很好的性能

队列

队列,顾名思义是将任务放入到队列中,然后每个进程从队列中取出任务执行的过程。因此使用队列进行多进程操作包括:
1.将任务放入到队列中,我们使用Queue.put()方法
2.启动进程,我们是用Process(target=target, args=args).start()
3.获取结果,我们是用Queue.get()方法
4.终止进程,我们将终止信号STOP传入到每个进程中,使得进程停止的方法是iter(iterable, sentinel)中的sentinel参数,当iter获得sentinel参数时,迭代停止。
队列的方法可以需在worker函数中,将函数func的输入变为*data,将调用的函数funcs自定义为1个参数或者多个参数,这对于调用别人写好的函数非常方便,下面是使用队列的例子:

from tqdm import tqdm
from multiprocessing import Process, Queue

def worker(input, output):
    for idx, func, data in iter(input.get, 'STOP'):
        result = func(*data)
        output.put((idx, result))


def multi_process(func, iterable, nworkers):
    NUMBER_OF_PROCESSES = nworkers
    results = []

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for idx, task in enumerate(iterable):
        task_queue.put((idx, func, task))

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # get result
    for _ in tqdm(range(len(iterable))):
        results.append(done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')

    # the results are unordered, so use original idx to recovery order
    results.sort(key=lambda x: x[0])
    return [x[1] for x in results]

def funcs(x,y):
    return x+y

if __name__ == '__main__':
	multi_process(funcs, tqdm(list(zip(*[range(10000000), \
	range(10000000)]))), 6)

在这个例子中,执行时间为5-6分钟,远超于普通计算与进程池,在观察中发现,对于1000w数据,将任务放到进程池中的速度为30w/s,而获取结果的速度为3w/s。

总结

可以发现,当计算任务非常简单的时候,使用多进程带来的提升效果不明显,当任务数非常庞大的时候,应当选择进程池的方法提升效率,当任务数不大的时候,可以同时选择进程池与队列的方式。
我在pypi上传了spft(standard python function tools)的包,可以简单快速的上手多进程任务,使用pip install spft可以下载,使用如下:

from spft.multiprocess import multi_process
multi_process(func, iterable, worker_num, is_queue)

multi_process可以自动匹配函数使用单参数或多参数,is_queue决定使用进程池或队列方式。

更新 2022.8.24

最近在使用库处理数据的时候,速度能够提升8倍,但是进度条显示有几个数据没有处理完,同时程序跑到最后不会终止,一开始我怀疑是多进程无法处理大量数据的情况,后来终于发现,原来数据中存在脏乱数据,使用单进程的时候,遇到错误的数据程序会exit,但不会报错,因此在多进程的处理过程中,一个进行挂掉了但不会影响其他进程的处理,但是挂掉的进程无法stop导致程序跑到最后不会终止

相关文章

学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习...
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面...
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生...
Can’t connect to local MySQL server through socket \'/v...
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 ...
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服...