与线程共享的内存

问题描述

我在尝试链接线程内存时遇到问题。我希望计数器在线程之间共享内存,所有线程都只计数到某个数字(在这种情况下为 100),最后它返回到主线程。问题是即使有锁,所有线程也只有一个计数

import threading
from threading import Thread,Lock
import time
import multiprocessing
import random

def create_workers(n_threads,counter):
    # counter = 0
    workers = []
    for n in range(n_threads):
        worker = DataCampThread('Thread - ' + str(n),counter)
        workers.append(worker)

    for worker in workers:
        worker.start()

    for worker in workers:
        worker.join()

    return counter

def thread_delay(thread_name,num,delay):
    num += 1
    time.sleep(delay)
    print(thread_name,'-------->',num)
    return num

class DataCampThread(Thread):
    def __init__(self,name,cou):
        Thread.__init__(self)
        self.name = name
        self.counter = cou
        delay = random.randint(1,2)
        self.delay = delay
        self.lock = Lock()

    def run(self):
        print('Starting Thread:',self.name)
        while self.counter < 100:
            self.lock.acquire()
            self.counter = thread_delay(self.name,self.counter,self.delay)
            self.lock.release()
        print('Execution of Thread:',self.name,'is complete!')

if __name__ == '__main__':
    # create the agent
    n_threads = 3#multiprocessing.cpu_count()
    counter = 0
    create_workers(n_threads,counter)
    print(counter)
    print("Thread execution is complete!")

解决方法

正如我在评论中提到的,我不太确定您要做什么 - 但这是(希望)加快处理速度的一个不知情的猜测。

根据您对我关于想要避免使用全局变量的初始版本的回答,计数器现在是一个类属性,该属性将自动由类的所有实例共享。每个线程都有自己的名称和随机选择的时间,它在更新名为 counter 的共享类属性之间延迟。

注意:测试代码重新定义了 print() 函数,以防止它一次被多个线程使用。

import threading
from threading import Thread,Lock
import time
import random

MAXVAL = 10


class DataCampThread(Thread):

    counter = 0  # Class attribute.
    counter_lock = Lock()  # Control concurrent access to shared class attribute.

    def __init__(self,name):
        super().__init__()  # Initialize base class.
        self.name = name
        self.delay = random.randint(1,2)

    def run(self):
        print('Starting Thread:',self.name)
        while True:
            with self.counter_lock:
                if self.counter >= MAXVAL:
                    break  # Exit while loop (also releases lock).
#                self.counter += 1  # DON'T USE - would create an instance-level attribute.
                type(self).counter += 1  # Update class attribute.
                print(self.name,'-------->',self.counter)
            time.sleep(self.delay)
        print('Execution of Thread:',self.name,'is complete!')


def main(n_threads,maxval):
    ''' Create and start worker threads,then wait for them all to finish. '''

    workers = [DataCampThread(name=f'Thread #{i}')  for i in range(n_threads)]

    for worker in workers:
        worker.start()

    # Wait for all treads to finish.
    for worker in workers:
        worker.join()


if __name__ == '__main__':

    import builtins

    def print(*args,**kwargs):
        ''' Redefine print to prevent concurrent printing. '''
        with print.lock:
            builtins.print(*args,**kwargs)

    print.lock = Lock()  # Function attribute.

    n_threads = 3
    main(n_threads,MAXVAL)
    print()
    print('Thread execution is complete!')
    print('final counter value:',DataCampThread.counter)

示例输出:

Starting Thread: Thread #0
Starting Thread: Thread #1
Thread #0 --------> 1
Starting Thread: Thread #2
Thread #1 --------> 2
Thread #2 --------> 3
Thread #1 --------> 4
Thread #0 --------> 5
Thread #2 --------> 6
Thread #2 --------> 7
Thread #1 --------> 8
Thread #0 --------> 9
Thread #2 --------> 10
Execution of Thread: Thread #1 is complete!
Execution of Thread: Thread #0 is complete!
Execution of Thread: Thread #2 is complete!

Thread execution is complete!
final counter value: 10