问题描述
我正在努力解决一个容易解决但难以解决的问题。 我想设置一个websocket服务器,将消息广播到每个客户端。消息来自另一个API,该API需要进行事件的无限循环。我决定将这个无限循环放在另一个线程中,但是我很难管理线程之间的交互作用。
我确实需要主线程能够控制 Producer 线程,例如更改“客户端”列表或customMessage dict。由于无限循环将在run()中运行,因此我需要确保所有内容都是线程安全的。
这是我的代码。我在while循环中用创建随机消息以及基于客户端的自定义消息代替了API事件处理。我们等待1秒以模仿两次事件之间的时间。
from SimpleWebSocketServer import SimpleWebSocketServer,WebSocket
from random import random
import time
import threading
class Producer(threading.Thread):
def __init__(self):
self.clients = []
self.customMessage = {}
self.lock = threading.RLock()
self.session = self.connect()
super(Producer,self).__init__()
def run(self):
while True:
# ultimately this part will process received events from another api
message = f"random_number : {random()}"
for client in self.clients:
# send random number to everybody
client.sendMessage(message)
# send a custom message
if client in self.customMessage:
client.sendMessage(self.customMessage[client])
time.sleep(1)
def addCLient(self,client):
with lock:
self.clients.append(client)
def removeCLient(self,client):
with lock:
self.clients.remove(client)
def manageCustomMessage(self,client,message):
with lock:
self.customMessage[client] = message
class Consumer(WebSocket):
def handleMessage(self):
producer.manageCustomMessage(self,self.data)
def handleConnected(self):
print(self.address,'connected')
producer.addCLient(self)
def handleClose(self):
producer.removeCLient(self)
print(self.address,'closed')
producer = Producer()
producer.setDaemon(True)
producer.start()
server = SimpleWebSocketServer('127.0.0.1',8080,Consumer)
server.serveforever()
我的问题是:以我的方式使用锁是否可以?添加除运行到类线程之外的方法是否还好?
我想避免排队,因为我将需要从Producer类中获得许多属性,而我必须从主类中更改这些属性。
预先感谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)