如何在另一个无限循环线程中与生产者一起正确管理Websocket服务器

问题描述

我正在努力解决一个容易解决但难以解决的问题。 我想设置一个websocket服务器,将消息广播到每个客户端。消息来自另一个API,该API需要进行事件的无限循环。我决定将这个无限循环放在另一个线程中,但是我很难管理线程之间的交互作用。

我确实需要主线程能够控制 Producer 线程,例如更改“客户端”列表或customMessage dict。由于无限循环将在run()中运行,因此我需要确保所有内容都是线程安全的。

这是我的代码。我在while循环中用创建随机消息以及基于客户端的自定义消息代替了API事件处理。我们等待1秒以模仿两次事件之间的时间。

from SimpleWebSocketServer import SimpleWebSocketServer,WebSocket
from random import random
import time
import threading



class Producer(threading.Thread):

    def __init__(self):
        self.clients = []
        self.customMessage = {}
        self.lock = threading.RLock()
        self.session = self.connect()

        super(Producer,self).__init__()

    def run(self):
        while True:
            # ultimately this part will process received events from another api

            message = f"random_number : {random()}"
            for client in self.clients:
                # send random number to everybody
                client.sendMessage(message)

                # send a custom message
                if client in self.customMessage:
                    client.sendMessage(self.customMessage[client])
            time.sleep(1)

    def addCLient(self,client):
        with lock:
            self.clients.append(client)

    def removeCLient(self,client):
        with lock:
            self.clients.remove(client)

    def manageCustomMessage(self,client,message):
        with lock:
            self.customMessage[client] = message



class Consumer(WebSocket):

    def handleMessage(self):
        producer.manageCustomMessage(self,self.data)

    def handleConnected(self):
       print(self.address,'connected')
       producer.addCLient(self)

    def handleClose(self):
       producer.removeCLient(self)
       print(self.address,'closed')


producer = Producer()
producer.setDaemon(True)
producer.start()

server = SimpleWebSocketServer('127.0.0.1',8080,Consumer)
server.serveforever()

我的问题是:以我的方式使用锁是否可以?添加除运行到类线程之外的方法是否还好?

我想避免排队,因为我将需要从Producer类中获得许多属性,而我必须从主类中更改这些属性

预先感谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)