问题描述
我在尝试链接线程内存时遇到问题。我希望计数器在线程之间共享内存,所有线程都只计数到某个数字(在这种情况下为 100),最后它返回到主线程。问题是即使有锁,所有线程也只有一个计数
import threading
from threading import Thread,Lock
import time
import multiprocessing
import random
def create_workers(n_threads,counter):
# counter = 0
workers = []
for n in range(n_threads):
worker = DataCampThread('Thread - ' + str(n),counter)
workers.append(worker)
for worker in workers:
worker.start()
for worker in workers:
worker.join()
return counter
def thread_delay(thread_name,num,delay):
num += 1
time.sleep(delay)
print(thread_name,'-------->',num)
return num
class DataCampThread(Thread):
def __init__(self,name,cou):
Thread.__init__(self)
self.name = name
self.counter = cou
delay = random.randint(1,2)
self.delay = delay
self.lock = Lock()
def run(self):
print('Starting Thread:',self.name)
while self.counter < 100:
self.lock.acquire()
self.counter = thread_delay(self.name,self.counter,self.delay)
self.lock.release()
print('Execution of Thread:',self.name,'is complete!')
if __name__ == '__main__':
# create the agent
n_threads = 3#multiprocessing.cpu_count()
counter = 0
create_workers(n_threads,counter)
print(counter)
print("Thread execution is complete!")
解决方法
正如我在评论中提到的,我不太确定您要做什么 - 但这是(希望)加快处理速度的一个不知情的猜测。
根据您对我关于想要避免使用全局变量的初始版本的回答,计数器现在是一个类属性,该属性将自动由类的所有实例共享。每个线程都有自己的名称和随机选择的时间,它在更新名为 counter
的共享类属性之间延迟。
注意:测试代码重新定义了 print()
函数,以防止它一次被多个线程使用。
import threading
from threading import Thread,Lock
import time
import random
MAXVAL = 10
class DataCampThread(Thread):
counter = 0 # Class attribute.
counter_lock = Lock() # Control concurrent access to shared class attribute.
def __init__(self,name):
super().__init__() # Initialize base class.
self.name = name
self.delay = random.randint(1,2)
def run(self):
print('Starting Thread:',self.name)
while True:
with self.counter_lock:
if self.counter >= MAXVAL:
break # Exit while loop (also releases lock).
# self.counter += 1 # DON'T USE - would create an instance-level attribute.
type(self).counter += 1 # Update class attribute.
print(self.name,'-------->',self.counter)
time.sleep(self.delay)
print('Execution of Thread:',self.name,'is complete!')
def main(n_threads,maxval):
''' Create and start worker threads,then wait for them all to finish. '''
workers = [DataCampThread(name=f'Thread #{i}') for i in range(n_threads)]
for worker in workers:
worker.start()
# Wait for all treads to finish.
for worker in workers:
worker.join()
if __name__ == '__main__':
import builtins
def print(*args,**kwargs):
''' Redefine print to prevent concurrent printing. '''
with print.lock:
builtins.print(*args,**kwargs)
print.lock = Lock() # Function attribute.
n_threads = 3
main(n_threads,MAXVAL)
print()
print('Thread execution is complete!')
print('final counter value:',DataCampThread.counter)
示例输出:
Starting Thread: Thread #0
Starting Thread: Thread #1
Thread #0 --------> 1
Starting Thread: Thread #2
Thread #1 --------> 2
Thread #2 --------> 3
Thread #1 --------> 4
Thread #0 --------> 5
Thread #2 --------> 6
Thread #2 --------> 7
Thread #1 --------> 8
Thread #0 --------> 9
Thread #2 --------> 10
Execution of Thread: Thread #1 is complete!
Execution of Thread: Thread #0 is complete!
Execution of Thread: Thread #2 is complete!
Thread execution is complete!
final counter value: 10