问题描述
我正在构建一个 API,该 API 具有通过 websockets 进行通信的组件,目前使用高速公路库。我试图弄清楚如何从他们的 github(特别是 this example)修改一个示例客户端,这样我就可以向客户端添加额外的功能,我可以从外部调用,类似于下面的修改版本他们的例子,这是我目前最好的尝试。我遇到的两个主要问题:
-
如何获取高速公路正在使用的协议类的实例?我似乎可以在我的示例中的
_,client = loop.run_until_complete(coro)
行中获得对它的引用,但它似乎对我不起作用。 -
假设问题 1 有解决方案,那么在示例底部运行高速公路客户端和 while True 循环的最佳方法是什么?我应该在线程中抛出
loop.run_forever()
行吗?
这是我在他们的 github 上修改后的 example 版本,我想做什么:
import asyncio
import random
from autobahn.asyncio.websocket import WebSocketClientProtocol,WebSocketClientFactory
class MyClientProtocol(WebSocketClientProtocol):
def __init__(self):
self.super().__init__()
self.state = -1
def onConnect(self,response):
print("Server connected: {0}".format(response.peer))
async def onopen(self):
print("WebSocket connection open.")
# start sending messages every second ..
while True:
self.sendMessage("Hello,world!".encode('utf8'))
self.sendMessage(b"\x00\x01\x03\x04",isBinary=True)
await asyncio.sleep(1)
def onMessage(self,payload,isBinary):
if isBinary:
print("Binary message received: {0} bytes".format(len(payload)))
else:
print("Text message received: {0}".format(payload.decode('utf8')))
def onClose(self,wasClean,code,reason):
print("WebSocket connection closed: {0}".format(reason))
# new setter function I want to be able to use
def sendNextAction(self,next_action: int):
print("Next action is {}".format(next_action))
self.state += next_action
# new getter function I want to be able to use
def getCurrentState(self):
return self.state
if __name__ == '__main__':
factory = WebSocketClientFactory("ws://127.0.0.1:9000")
factory.protocol = MyClientProtocol
loop = asyncio.get_event_loop()
coro = loop.create_connection(factory,'127.0.0.1',9000)
_,client = loop.run_until_complete(coro) # my attempt to get client reference
loop.run_forever()
loop.close()
# this is how I want to interact with the MyClientProtocol outside of it
actions = [1,2,3,4,5]
while True:
client.sendNextAction(random.choice(actions))
curr_state = client.getCurrentState()
print("Current state is Now {}".format(curr_state))
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)