基于队列的通信:发送返回队列是个好主意吗? 共享内存类型

问题描述

我正在使用以下方法来等待另一个线程中运行的异步操作的结果:

def client(worker_queue,message):
    answer_queue = queue.Queue(maxsize=1)
    worker_queue.put((answer_queue,message))
    result = answer_queue.get(timeout=10)


def worker():
    while True:
        answer_queue,message = worker_queue.get()
        result = do_someting_with(message)
        answer_queue.put(result)
        worker_queue.task_done()

(primitiv worker 只是一个例子。在其他情况下,可能需要在多个回调之间传递“answer_queue”)

这是个好主意,还是会遇到问题(例如内存管理)?

有没有更好的方法来做到这一点?

我知道 asyncio 有像 futures 这样的东西来处理这样的问题,但目前我正在寻找(也)适用于多线程的东西。

解决方法

Python 使用内存地址来访问类变量及其方法。使用队列只是确保每个工作人员都有一个唯一的内存地址位置来放置其答案。

确保,如果变量仍然是局部变量,当您不再需要该变量时,重新分配它,以便垃圾回收可以释放队列正在使用的内存,或者使用 del 关键字清除内存以手动执行,例如 del answer_queue

您可以使用任何数据类型来完成在 worker 和 client 之间传输数据的作用,因为所有类方法都是通过内存地址访问的;然而,最常见的数据传输方式是:

使用队列

queue.Queue() 已经非常优化并且非常通用,因此它是一种在客户端和工作器之间执行通信的可靠方式。

import queue
import datetime
from threading import Thread
def queue_client(worker_queue,message):
    answer_queue = queue.Queue(maxsize=1)
    worker_queue.put((answer_queue,message))
    result = answer_queue.get(timeout=60)
    print('Bytes received from worker:',len(result))

def queue_worker(worker_queue):
    answer_queue,message = worker_queue.get()
    do_something = lambda x: x[::-1]
    result = do_something(message)
    answer_queue.put(result)

if __name__ == '__main__':
    large_message = 'X' * (2<<30) #2GB

    worker_queue = queue.Queue()
    client = Thread(target=queue_client,args=(worker_queue,large_message,))
    worker = Thread(target=queue_worker,))
    start_time = datetime.datetime.now()
    client.start()
    worker.start()
    client.join()
    worker.join()
    dt = datetime.datetime.now() - start_time
    print('Time elapsed using Queue:',dt.total_seconds()) #~1.47 secs on my machine

共享内存类型

由于在多线程时可以直接访问变量地址的内存,所以可以使用共享内存;这通常是访问数据的最快方式,但您需要某种类型的数据容器对象。

一个非常基本的示例(可以将工作人员生成、回答提交等添加到管理器中):

import datetime
from threading import Thread

class DataManager:

    def __init__(self,):
        self.tasks = {
            #worker_id: message
        }
        self.answers = {
            #worker_id: answer
        }

def shared_client(datamanager,worker_id,message):
    datamanager.tasks[worker_id] = message
    while not datamanager.answers.get(worker_id,None): #Wait for answer
        pass
    result = datamanager.answers[worker_id]
    print('Bytes received from worker:',len(result))
    

def shared_worker(data_manager,worker_id):
    while not data_manager.tasks.get(worker_id,None): #Wait for task to get assigned
        pass

    message = data_manager.tasks[worker_id]
    do_something = lambda x: x[::-1]
    result = do_something(message)
    data_manager.answers[worker_id] = result

if __name__ == '__main__':
    large_message = 'X' * (2<<30) #2GB
    data_manager = DataManager()

    worker_id = 0
    client = Thread(
        target=shared_client,args=(data_manager,)
    )
    worker = Thread(target=shared_worker,worker_id))
    start_time = datetime.datetime.now()
    client.start()
    worker.start()
    client.join()
    worker.join()
    dt = datetime.datetime.now() - start_time
    print('Time elapsed using Shared Memory:',dt.total_seconds()) #~1.44 secs on my machine

如果您使用诸如 Multiprocessing.Value 和 Multiprocessing.Array (link) 之类的共享内存类型/ctypes,可能会有更优雅(甚至可能更快)的解决方案。

管道

管道作为一种连接,一端在侦听,另一端在发送数据。 重要的是,当发送大于 ~32MB 的数据包时,这比队列更有利,同时多处理。此外,它是可序列化的,不像 queue.Queue().
使用管道的示例:

import datetime
from threading import Thread
from multiprocessing import Pipe

def client(conn,message):
    conn.send(message)      #Send message to worker
    result = conn.recv()    #Receive result
    print('Bytes received from worker:',len(result))

def worker(conn):
    message = conn.recv() #Get message from client
    do_something = lambda x: x[::-1]
    result = do_something(message)
    conn.send(result) #Send result to client

if __name__ == '__main__':
    large_message = 'X' * (2<<30) #2GB
    client_conn,worker_conn = Pipe(duplex=True) #Bidirectional pipe
    client_process = Thread(target=client,args=(client_conn,large_message))
    worker_process = Thread(target=worker,args=(worker_conn,))
    
    start_time = datetime.datetime.now()
    client_process.start()
    worker_process.start()

    client_process.join()
    worker_process.join()
    dt = datetime.datetime.now() - start_time

    print('Time elapsed using Pipe:',dt.total_seconds()) #~9.07 secs on my machine

代理

代理在客户端上注册函数,并允许通过调用底层公开函数的工作线程来执行它们。当您的工作人员与您的客户端在不同的计算机上时,这会比较棘手,但对于集群计算来说是必要的。