问题描述
我为并行处理编码,但需要很长时间。我猜代码有问题。
我想做什么?
供您参考,我想要的结果是[1,2,3,1,3]。
我的代码如下:
import time
import numpy as np
import multiprocessing
data = [1,4,10,5,6,7,8,9,11,12,100,101]
dictionary = [1,3]
data_split = np.array_split(data,4)
Q = multiprocessing.Queue()
def recog_func(data):
result = []
for w in data:
if w in [1,3]:
result.append(w)
print(result)
Q.put(result)
procs=[]
for s in data_split:
p = multiprocessing.Process(target = recog_func,args=(s,))
p.start()
result = Q.get()
procs.extend(result)
for p in procs:
p.join() # 프로세스가 모두 종료될 때까지 기다린다.
end = time.time()
非常感谢您的帮助。
解决方法
这会正常工作。
import time
import numpy as np
import multiprocessing
data = [1,2,3,4,10,5,6,7,8,9,1,11,12,100,101]
dictionary = [1,3]
data_split = np.array_split(data,4)
Q = multiprocessing.Queue()
def recog_func(data):
result = []
for w in data:
if w in [1,3]:
result.append(w)
print(result)
Q.put(result)
procs = []
results = []
for s in data_split:
p = multiprocessing.Process(target=recog_func,args=(s,))
p.start()
results.extend(Q.get())
procs.append(p)
for p in procs:
p.join() # 프로세스가 모두 종료될 때까지 기다린다.
end = time.time()
,
您有两个主要问题。第一个是您有列表 procs
,您需要将使用以下语句创建的 Process
实例添加到其中,以便您稍后可以调用 join
。也就是说,你失踪了:
procs.append(p)
取而代之的是:
procs.extend(result)
这是将结果存储在 procs
列表中。所以稍后当您尝试执行时:
for p in procs:
p.join()
p
不再是一个 Process
实例而是一个 numpy.int64
实例,您现在会得到一个 AttributeError
异常,因为这种类型的对象没有 {{ 1}} 方法。
第二个问题是在下面的循环中:
join
您正在启动每个进程,然后立即等待该进程通过调用 for s in data_split:
p = multiprocessing.Process(target = recog_func,))
p.start()
result = Q.get()
procs.extend(result)
返回其结果,然后再循环返回并启动下一个进程。因此,您仍然没有并行运行这些进程中的任何一个。即使您推迟调用 Q.get
,通过调用 join
,您实际上已经在等待第一个进程完成其所有处理并将其结果写入输出队列,然后再创建并启动下一个进程过程。出于所有意图和目的,第一个过程已完成处理。 在尝试阻止从任何进程检索结果之前,您必须创建并启动所有 3 个进程。但是现在所有三个进程都并行运行,您真的无法确定顺序他们完成并因此将他们的结果写入输出队列。因此,您需要有三个独立的输出队列,每个进程一个如果您希望结果按特定顺序。
最后,您应该意识到创建进程的开销以及读取和写入这些多处理队列的开销,这在非多处理程序中是没有的。为了证明额外的开销是合理的,您的函数 Q.get
需要充分占用 CPU,我不相信。如果你做计时,我相信你会发现你并没有取得更好的成绩。
recog_func
打印:
import time
import numpy as np
import multiprocessing
data = [1,4)
def recog_func(data,q):
result = []
for w in data:
if w in [1,3]:
result.append(w)
print(result)
q.put(result)
queues = []
procs = []
results = []
for s in data_split:
q = multiprocessing.Queue()
p = multiprocessing.Process(target = recog_func,q))
procs.append(p)
queues.append(q)
p.start()
for q in queues:
result = q.get()
results.extend(result)
for p in procs:
p.join() # 프로세스가 모두 종료될 때까지 기다린다.
print(results)
end = time.time()