参考
苏神写的parallel_apply link
python中multiprocessing文档 link
前言
在之前的工作中,我经常使用的多进程方式是进程池,在看了苏神写的parallel_apply之后,发现多进程使用队列的方式运行,从显示上(tqdm显示运行进度),灵活性上都更加好 经过更深入的使用之后,我发现其实进程池也可以比较灵活的传递参数。然而,在运行时发现多进程在windows上的支持很不友好,于是打算写一篇在windows上容易使用的多进程方法,当然也可以在linux上使用。
使用多进程来提高程序的运行效率是非常重要的,通常可以在数据处理,矩阵运算等方面应用。
进程在windows上使用的注意点
1.必须得保证你要使用多进程处理的程序是可序列化的,即可以转换为可存储或传输的形式,如generator就是不可序列化的形式,因此想要对jieba分词使用多进程就是不可行的
2.多个进程之间是独立的,所以想在多个进程中更新同一个类属性是不可行的,除非是使用multiprocessing中带的Array,List等方法,并将这些变量作为输入传入到进程中去,不过这个方法我并没有尝试过。
3.每个进程使用的函数都是一次性的,因此若是传入的类函数中,初始时赋值了变量,则这个变量在每一次运行时都会执行,若这个赋值变量的运行时间很长,则会导致使用多进程甚至比单独执行更慢,以下是例子:
import json
class myClass:
def __init__(self):
with open(file_name, 'r', encoding='utf-8') as f:
self.data = json.load(f)
def forward(self, idx):
return self.data[idx]
上述例子中,初始化__init__
函数中,需要载入数据来执行forward
函数,这种情况下将foward放入多进程中,会极大的影响效率,甚至比单独执行更慢。
4.在if __name__ == '__main__'
或者函数中设定的全局变量无法在多进程中使用,因为如果子进程中的代码尝试访问一个全局变量,它所看到的值(如果有)可能和父进程中执行 Process.start 那一刻的值不一样。例子如下:
import json
from multiprocessing import Pool
def forward(idx):
return data[idx]
if __name__ == '__main__':
global data
with open(file_name, 'r', encoding='utf-8') as f:
data = json.load(f)
with Pool(6) as pool:
pool.map(forward, [1,2,3,4,5])
而下面这种方式是可以的,因为全局变量是知识模块级别的变量
import json
from multiprocessing import Pool
with open(file_name, 'r', encoding='utf-8') as f:
data = json.load(f)
def forward(idx):
return data[idx]
if __name__ == '__main__':
with Pool(6) as pool:
pool.map(forward, [1,2,3,4,5])
5.在使用多进程时,应当使用if __name__ == '__main__'
,从而保护程序的入口点,不使用时可能会出现各种各样的错误。
方法
进程池
进程池是比较简单的方法,只需设定好池中进程的数量,使用map(function, args)
即可使用,也可以使用starmap(function, args)
的方法,map
与starmap的区别在于map
仅能传递一个参数,而starmap
可以传递多个参数,例子如下
from multiprocessing import Pool
from tqdm import tqdm
def func(data):
x = data[0]
y = data[1]
return x+y
def func1(x, y):
return x+y
def simple_calculate():
for data in tqdm(list(zip(*[range(10000000), \
range(10000000)]))):
func(data)
if __name__ == '__main__':
with Pool(6) as pool:
pool.map(func, tqdm(list(zip(*[range(10000000), \
range(10000000)]))))
with Pool(6) as pool:
pool.starmap(func1, tqdm(list(zip(*[range(10000000), \
range(10000000)]))))
simple_calculate()
map
调用的是func,每次传入的参数为data
, 而starmap每次传入的参数为x,y
。使用进程池运行上述例子的时间大约是4s,而简单的运算simple_calculation
为3s,显然在简单的数值运算上,多进程并没有很好的性能
队列
队列,顾名思义是将任务放入到队列中,然后每个进程从队列中取出任务执行的过程。因此使用队列进行多进程操作包括:
1.将任务放入到队列中,我们使用Queue.put()
方法
2.启动进程,我们是用Process(target=target, args=args).start()
3.获取结果,我们是用Queue.get()
方法
4.终止进程,我们将终止信号STOP
传入到每个进程中,使得进程停止的方法是iter(iterable, sentinel)
中的sentinel
参数,当iter
获得sentinel
参数时,迭代停止。
队列的方法可以需在worker
函数中,将函数func
的输入变为*data
,将调用的函数funcs
自定义为1个参数或者多个参数,这对于调用别人写好的函数非常方便,下面是使用队列的例子:
from tqdm import tqdm
from multiprocessing import Process, Queue
def worker(input, output):
for idx, func, data in iter(input.get, 'STOP'):
result = func(*data)
output.put((idx, result))
def multi_process(func, iterable, nworkers):
NUMBER_OF_PROCESSES = nworkers
results = []
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for idx, task in enumerate(iterable):
task_queue.put((idx, func, task))
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# get result
for _ in tqdm(range(len(iterable))):
results.append(done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
# the results are unordered, so use original idx to recovery order
results.sort(key=lambda x: x[0])
return [x[1] for x in results]
def funcs(x,y):
return x+y
if __name__ == '__main__':
multi_process(funcs, tqdm(list(zip(*[range(10000000), \
range(10000000)]))), 6)
在这个例子中,执行时间为5-6分钟,远超于普通计算与进程池,在观察中发现,对于1000w数据,将任务放到进程池中的速度为30w/s,而获取结果的速度为3w/s。
总结
可以发现,当计算任务非常简单的时候,使用多进程带来的提升效果不明显,当任务数非常庞大的时候,应当选择进程池的方法提升效率,当任务数不大的时候,可以同时选择进程池与队列的方式。
我在pypi上传了spft(standard python function tools)的包,可以简单快速的上手多进程任务,使用pip install spft
可以下载,使用如下:
from spft.multiprocess import multi_process
multi_process(func, iterable, worker_num, is_queue)
multi_process
可以自动匹配函数使用单参数或多参数,is_queue
决定使用进程池或队列方式。
更新 2022.8.24
最近在使用库处理数据的时候,速度能够提升8倍,但是进度条显示有几个数据没有处理完,同时程序跑到最后不会终止,一开始我怀疑是多进程无法处理大量数据的情况,后来终于发现,原来数据中存在脏乱数据,使用单进程的时候,遇到错误的数据程序会exit,但不会报错,因此在多进程的处理过程中,一个进行挂掉了但不会影响其他进程的处理,但是挂掉的进程无法stop导致程序跑到最后不会终止