Python aiohttp.web 模块,WebSocketResponse() 实例源码
我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用aiohttp.web.WebSocketResponse()。
def wshandler(request):
app = request.app
ws = web.WebSocketResponse()
await ws.prepare(request)
app["sockets"].append(ws)
if app["game_is_running"] == False:
asyncio.ensure_future(game_loop(app))
while 1:
msg = await ws.receive()
if msg.tp == web.MsgType.text:
print("Got message %s" % msg.data)
ws.send_str("pressed key code: {}".format(msg.data))
elif msg.tp == web.MsgType.close or\
msg.tp == web.MsgType.error:
break
app["sockets"].remove(ws)
print("Closed connection")
return ws
def wshandler(request):
app = request.app
ws = web.WebSocketResponse()
await ws.prepare(request)
app["sockets"].append(ws)
while 1:
msg = await ws.receive()
if msg.tp == web.MsgType.text:
print("Got message %s" % msg.data)
ws.send_str("pressed key code: {}".format(msg.data))
elif msg.tp == web.MsgType.close or\
msg.tp == web.MsgType.error:
break
app["sockets"].remove(ws)
print("Closed connection")
return ws
def get(self, request):
resp = WebSocketResponse()
await resp.prepare(request)
print('Someone joined.')
for ws in request.app['sockets']:
ws.send_str('Someone joined')
request.app['sockets'].append(resp)
while True:
msg = await resp.receive()
if msg.tp == MsgType.text:
for ws in request.app['sockets']:
redis_info = self.queue.get()
ws.send_str(json.dumps(redis_info))
else:
break
request.app['sockets'].remove(resp)
print('Someone disconnected.')
for ws in request.app['sockets']:
ws.send_str('Someone disconnected.')
return resp
def websocket(req):
ws = web.WebSocketResponse()
await ws.prepare(req)
async for msg in ws:
if msg.type != WSMsgType.TEXT: continue
try:
msg = json.loads(msg.data)
if msg["type"] != "login": continue
room = msg["data"]["room"][:32]
if room: break
except (ValueError, KeyError, TypeError):
# Any parsing error,just wait for another message
continue
else:
# Something went wrong with the handshake. Kick
# the client and let them reconnect.
await ws.close()
return ws
if room not in rooms: Room(room)
return await rooms[room].websocket(ws, msg["data"])
# After all the custom routes,handle everything else by loading static files.
def __call__(self):
ws = web.WebSocketResponse()
await ws.prepare(self.request)
async for msg in ws:
if msg.tp == aiohttp.WSMsgType.text:
if msg.data == 'close':
await ws.close()
else:
ws.send_str(msg.data + '/answer')
elif msg.tp == aiohttp.WSMsgType.error:
logger.debug('ws connection closed with exception {0:s}'
.format(ws.exception()))
logger.debug('websocket connection closed')
return {}
def websocket_route_factory(execute_cmd, base_dispatcher):
async def on_connection(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
context = WebsocketClientConnectionContext(ws)
client_dispatcher = client_dispatcher_factory(context)
session = SessionComponent()
dispatcher = Composeddispatcher([
base_dispatcher, client_dispatcher, session.get_dispatcher()])
context.logger.info('Connection started')
try:
handshake_dispatcher = Composeddispatcher([
dispatcher, handshake_io_dispatcher_factory(context)])
await asyncio_perform(handshake_dispatcher, session.handshake())
except (HandshakeError, WebsocketConnectionClosed):
context.logger.info('Bad handshake,closing connection')
return ws
context.logger.debug('Handshake done,`%s` is authenticated.' % session.id)
await _on_connection_main_loop(execute_cmd, context, dispatcher)
context.logger.info('Connection closed by client')
return ws
return on_connection
def websocket_handler(request):
"""Handle a new socket connection."""
logger.debug('New websocket connection.')
websocket = web.WebSocketResponse()
yield from websocket.prepare(request)
uuid = uuid4()
request.app['dispatcher'].subscribe(uuid, websocket)
while True:
# Consume input buffer
try:
msg = yield from websocket.receive()
except RuntimeError as e:
logger.debug('Websocket exception: %s', str(e))
break
if msg.type == aiohttp.WSMsgType.CLOSED:
logger.debug('Websocket closed')
break
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.debug('Websocket exception: %s', websocket.exception())
break
request.app['dispatcher'].unsubscribe(uuid)
return websocket
def get(self):
self.logger = self.request.app.logger
self.websocket = web.WebSocketResponse()
self.client = Basicclient(self.websocket,
ip_address=get_ip_address_from_request(self.request))
await self.websocket.prepare(self.request)
self.logger.info('WebSocket client connected from {}'.format(self.client.ip_address))
async for msg in self.websocket:
if msg.type == WSMsgType.TEXT:
self.logger.info('Got WebSocket data: {}'.format(log_short(msg.data)))
payload = json.loads(msg.data)
if payload['type'] == 'rpc':
response = await self.handle_rpc_call(payload)
self.logger.info('Sending WebSocket data: {}'.format(log_short(response)))
elif msg.type == WSMsgType.ERROR:
self.logger.error('WebSocket error: {}'.format(self.websocket.exception()))
else:
self.logger.info('WebSocket connection closed')
if self.client.authenticated:
await self.request.app['ps'].unsubscribe_all(self.client)
del self.request.app['clients'][self.client.id]
return self.websocket
def ws_handler(request):
ws = web.WebSocketResponse(timeout=60)
await ws.prepare(request)
pipeline = iter([validate_token, load_score])
async for msg in ws:
cmd = next(pipeline)
if msg.tp == aiohttp.MsgType.text:
if msg == "close":
await ws.close()
else:
result, msg = await cmd(msg.data)
ws.send_str(msg)
if not result:
await ws.close()
break
elif msg.tp == aiohttp.MsgType.error:
handle_error(ws)
return ws
def wshandler(request):
app = request.app
ws = web.WebSocketResponse()
await ws.prepare(request)
if app["game_loop"] is None or \
app["game_loop"].cancelled():
app["game_loop"] = asyncio.ensure_future(game_loop(app))
# this is required to propagate exceptions
app["game_loop"].add_done_callback(lambda t: t.result()
if not t.cancelled() else None)
app["sockets"].append(ws)
while 1:
msg = await ws.receive()
if msg.tp == web.MsgType.text:
ws.send_str("pressed key code: {}".format(msg.data))
print("Got message %s" % msg.data)
elif msg.tp == web.MsgType.close or\
msg.tp == web.MsgType.error:
break
app["sockets"].remove(ws)
if len(app["sockets"]) == 0:
print("Stopping game loop")
app["game_loop"].cancel()
print("Closed connection")
return ws
def wshandler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
recv_task = None
tick_task = None
while 1:
if not recv_task:
recv_task = asyncio.ensure_future(ws.receive())
if not tick_task:
await tick.acquire()
tick_task = asyncio.ensure_future(tick.wait())
done, pending = await asyncio.wait(
[recv_task,
tick_task],
return_when=asyncio.FirsT_COMPLETED)
if recv_task in done:
msg = recv_task.result()
if msg.tp == web.MsgType.text:
print("Got message %s" % msg.data)
ws.send_str("pressed key code: {}".format(msg.data))
elif msg.tp == web.MsgType.close or\
msg.tp == web.MsgType.error:
break
recv_task = None
if tick_task in done:
ws.send_str("game loop ticks")
tick.release()
tick_task = None
return ws
def wshandler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
recv_task = None
tick_task = None
while 1:
if not recv_task:
recv_task = asyncio.ensure_future(ws.receive())
if not tick_task:
await tick.acquire()
tick_task = asyncio.ensure_future(tick.wait())
done,
return_when=asyncio.FirsT_COMPLETED)
if recv_task in done:
msg = recv_task.result()
if msg.tp == web.MsgType.text:
print("Got message %s" % msg.data)
ws.send_str("pressed key code: {}".format(msg.data))
elif msg.tp == web.MsgType.close or\
msg.tp == web.MsgType.error:
break
recv_task = None
if tick_task in done:
ws.send_str("game loop ticks")
tick.release()
tick_task = None
return ws
def wshandler(request):
print("Connected")
app = request.app
game = app["game"]
ws = web.WebSocketResponse()
await ws.prepare(request)
player = None
while True:
msg = await ws.receive()
if msg.tp == web.MsgType.text:
print("Got message %s" % msg.data)
data = json.loads(msg.data)
if type(data) == int and player:
# Interpret as key code
player.keypress(data)
if type(data) != list:
continue
if not player:
if data[0] == "new_player":
player = game.new_player(data[1], ws)
elif data[0] == "join":
if not game.running:
game.reset_world()
print("Starting game loop")
asyncio.ensure_future(game_loop(game))
game.join(player)
elif msg.tp == web.MsgType.close:
break
if player:
game.player_disconnected(player)
print("Closed connection")
return ws
def __init__(self, request: web.Request) -> None:
self.request = request
self.ws = web.WebSocketResponse()
self.running = False
self.client_started = False
self.listener = None # type: Optional[event.EventListener]
def wshandler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == web.MsgType.text:
await ws.send_str("Hello,{}".format(msg.data))
elif msg.type == web.MsgType.binary:
await ws.send_bytes(msg.data)
elif msg.type == web.MsgType.close:
break
return ws
def websocket(route, authenticate=False):
""" a wrapper method for transforming a coroutine into a websocket handler.
"""
def inner(func):
func = asyncio.coroutine(func)
@ft.wraps(func)
@asyncio.coroutine
def wrapper(request):
params = request.GET
kwargs = {}
if authenticate:
kwargs["user"] = handle_auth(params.get("token", None))
ws = WebSocketResponse()
try:
yield from ws.prepare(request)
yield from func(ws, params, **kwargs)
except Exception as err: # pragma: no cover
logger.error(str(err))
return ws
# cleanup the route
route_ = _clean_route(route)
wrapper.route = ("GET", route_, wrapper)
return wrapper
return inner
def websocket_pubsub(route, authenticate=False):
""" a wrapper method for transforming a coroutine into a websocket handler with
a pubsub manager. if `authenticate=False` the signature of your coroutine should be
`func(ws: WebSocketResponse,params: MultiDict,manager: SubscriptionManager)`
otherwise an additional keywork argument is available,that being the authenticated
user making the request.
"""
def inner(func):
func = asyncio.coroutine(func)
@ft.wraps(func)
@asyncio.coroutine
def wrapper(request):
params = request.GET
kwargs = {}
token = params.get("token", None)
if authenticate:
kwargs["user"] = handle_auth(params.get("token", None))
redis_ = yield from get_async_redis()
manager = SubscriptionManager(redis_)
kwargs["manager"] = manager
ws = WebSocketResponse()
try:
yield from ws.prepare(request)
yield from func(ws, **kwargs)
except Exception as err: # pragma: no cover
logger.error(str(err))
finally:
yield from manager.stop()
return ws
# cleanup the route
route_ = _clean_route(route)
wrapper.route = ("GET", wrapper)
return wrapper
return inner
def anonymous_websocket_route_factory(execute_cmd, base_dispatcher):
async def on_connection(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
context = WebsocketClientConnectionContext(ws)
client_dispatcher = client_dispatcher_factory(context)
dispatcher = Composeddispatcher([base_dispatcher, client_dispatcher])
context.logger.info('Connection started (anonymous connection)')
await _on_connection_main_loop(execute_cmd, dispatcher)
return ws
return on_connection
def __call__(self):
tm = get_tm(self.request)
await tm.abort(self.request)
ws = web.WebSocketResponse()
await ws.prepare(self.request)
async for msg in ws:
if msg.tp == aiohttp.WSMsgType.text:
message = ujson.loads(msg.data)
if message['op'] == 'close':
await ws.close()
elif message['op'] == 'GET':
txn = await tm.begin(request=self.request)
try:
await self.handle_ws_request(ws, message)
except Exception:
await ws.close()
raise
finally:
# only currently support GET requests which are *never*
# supposed to be commits
await tm.abort(txn=txn)
else:
await ws.close()
elif msg.tp == aiohttp.WSMsgType.error:
logger.debug('ws connection closed with exception {0:s}'
.format(ws.exception()))
logger.debug('websocket connection closed')
return {}
def get(self, request):
peername = request.transport.get_extra_info('peername')
if peername is not None:
host, port = peername
ws_id = "{}:{}".format(host, port)
ws = web.WebSocketResponse()
await ws.prepare(request)
print('WS connection open by', ws_id)
WebsocketHandler.socket_list.append((ws, ws_id))
msg = '{"action":"online_user","data" : [' + ','.join(['"' + _ws[1] + '"' for _ws in WebsocketHandler.socket_list]) + ']}'
core.notify_all(msg=msg)
try:
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
if msg.data == 'close':
print ('CLOSE MESSAGE RECEIVED')
await ws.close()
else:
# Analyse message sent by client and send response if needed
data = msg.json()
if data["action"] == "user_info":
print("WebsocketHandler", data["action"])
pass
elif msg.tp == aiohttp.MsgType.error:
print('ws connection closed with exception %s' % ws.exception())
finally:
print('WS connection closed for', ws_id)
WebsocketHandler.socket_list.remove((ws, ws_id))
return ws
def client_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
app = request.app
log.info("Client connected.")
clients = app['clients']
clients.add(ws)
notify_state(app, specific_client=ws)
try:
async for raw_msg in ws:
if raw_msg.tp == aiohttp.MsgType.text:
msg = api.Message.deserialize(raw_msg.data)
log.info("User message: %s", msg)
await handle_user_message(app, ws, msg)
elif raw_msg.tp == aiohttp.MsgType.closed:
break
elif raw_msg.tp == aiohttp.MsgType.error:
log.info("User websocket error: %s", raw_msg)
break
else:
log.error("UnkNown user message type: %s,ignoring.",
raw_msg.tp)
finally:
log.info("Client connection closed.")
clients.remove(ws)
return ws
def create_dispatch_stream_log(app):
@asyncio.coroutine
def dispatch_stream_log(request):
vault_id = request.match_info.get('vault_id', None)
limit = int(request.GET.get('limit', 100))
ws = web.WebSocketResponse()
yield from ws_stream_log(request, app, vault_id=vault_id, limit=limit)
return ws
return dispatch_stream_log
def get(self):
ws = web.WebSocketResponse()
await ws.prepare(self.request)
session = await get_session(self.request)
user = User(self.request.db, {'id': session.get('user')})
login = await user.get_login()
for _ws in self.request.app['websockets']:
_ws.send_str('%s joined' % login)
self.request.app['websockets'].append(ws)
async for msg in ws:
if msg.tp == MsgType.text:
if msg.data == 'close':
await ws.close()
else:
message = Message(self.request.db)
result = await message.save(user=login, msg=msg.data)
log.debug(result)
for _ws in self.request.app['websockets']:
_ws.send_str('(%s) %s' % (login, msg.data))
elif msg.tp == MsgType.error:
log.debug('ws connection closed with exception %s' % ws.exception())
self.request.app['websockets'].remove(ws)
for _ws in self.request.app['websockets']:
_ws.send_str('%s disconected' % login)
log.debug('websocket connection closed')
return ws
def handle(request):
peername = request.transport.get_extra_info('peername')
host = port = "unkNown"
if peername is not None:
host, port = peername[0:2]
log.debug("web journal socket request from {}[{}]".format(host, port))
ws = web.WebSocketResponse()
await ws.prepare(request)
jh = JournalHandler(ws)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
await jh.shutdown()
return ws
if msg.data == 'info':
jh.sync_info()
elif msg.data == 'history':
jh.sync_history()
elif msg.data == 'journal-sync-start':
jh.sync_log()
elif msg.data == 'journal-sync-stop':
jh.journal_sync_stop()
else:
log.debug("unkNown websocket command {}".format(str(msg.data)))
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' % ws.exception())
return ws
def handle(request):
peername = request.transport.get_extra_info('peername')
host = port = "unkNown"
if peername is not None:
host, port = peername[0:2]
log.debug("web resource socket request from {}[{}]".format(host, port))
ws = web.WebSocketResponse()
await ws.prepare(request)
jh = ResourceHandler(ws)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
await jh.shutdown()
return ws
elif msg.data == 'start-cpu-utilization':
jh.sync_cpu_usage()
elif msg.data == 'start-process-utilization':
jh.sync_process_utilization()
elif msg.data == 'get-meminfo':
jh.get_meminfo()
else:
log.debug("unkNown websocket command: {}".format(msg.data))
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' % ws.exception())
return ws
def subscriptions(self, request):
"""
Handles requests for new subscription websockets.
Args:
request (aiohttp.Request): the incoming request
Returns:
aiohttp.web.WebSocketResponse: the websocket response,when the
resulting websocket is closed
"""
if not self._accepting:
return web.Response(status=503)
web_sock = web.WebSocketResponse()
await web_sock.prepare(request)
async for msg in web_sock:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_message(web_sock, msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
LOGGER.warning(
'Web socket connection closed with exception %s',
web_sock.exception())
await web_sock.close()
await self._handle_unsubscribe(web_sock)
return web_sock
def websocket_handler(request):
channel = request.match_info.get('channel', 'postgresql2websocket')
ws = web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].append(ws)
pool = request.app['pool']
async with pool.acquire() as connection:
queue = asyncio.Queue()
await connection.add_listener(channel, callback_websocket(ws))
try:
async for msg in ws:
pass
finally:
request.app['websockets'].remove(ws)
return ws
def wshandler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == web.MsgType.text:
await ws.send_str("Hello,{}".format(msg.data))
elif msg.type == web.MsgType.binary:
await ws.send_bytes(msg.data)
elif msg.type == web.MsgType.close:
break
return ws
def serve(port=8080, loop=None, handle_signals=True, **kwargs):
if loop is None:
loop = get_event_loop()
conns = []
@coroutine
def handle(request):
ws = WebSocketResponse()
yield from ws.prepare(request)
conn = Connection(ws, loop, **kwargs)
conns.append(conn)
try:
yield from conn.wait_closed()
finally:
conns.remove(conn)
return ws
def on_shutdown(app):
for conn in conns:
conn.close()
aiohttp_ver = tuple(map(int, aiohttp_version.split('.')))
app = Application(**({} if aiohttp_ver >= (2,) else {'loop': loop}))
app.router.add_route('GET', '/', handle)
app.on_shutdown.append(on_shutdown)
busywait = PeriodicCall(lambda: None, 1, loop) # see comment for BlockingConnection.busywait
try:
kwargs_run = {}
if aiohttp_ver >= (2,):
kwargs_run['loop'] = loop
if aiohttp_ver >= (2, 2):
kwargs_run['handle_signals'] = handle_signals
run_app(app, port=port, **kwargs_run)
finally:
busywait.cancel()
def __init__(self, request: web.Request):
self.request = request
self.sessions: Dict[str, Tuple[Any, Session]] = {}
self.ws = web.WebSocketResponse()
def __init__(self, ws: web.WebSocketResponse, sid: str):
self._ws = ws
self._id = sid
self._queue = asyncio.Queue()
self.finished = False
self.default_timeout = None
def ws_handler(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
sid = request.cookies.get(self.config["cookie_name"])
client = None
admin = False
operator = False
login = None
if sid:
login = self.sessions.get(sid)
if login:
token = self.tokens.get(login).decode("ascii")
if token:
client = github.Client(token)
if client:
login = login.decode("utf8")
if login in self.acl["admin"]["users"]:
admin = True
if login in self.acl["operator"]["users"]:
operator = True
orgs = await client.get("user/orgs")
for org in orgs:
if org["login"] in self.acl["admin"]["orgs"]:
admin = True
if org["login"] in self.acl["operator"]["orgs"]:
operator = True
user = {"login": login, "admin": admin, "operator": operator}
self._ws_user_map[ws] = user
ws.send_str(json.dumps(["authOk", user]))
user_events = self._user_events[login]
if user_events:
ws.send_str(json.dumps(["userTasks", [e.id for e in user_events]]))
for event in self.root.get_tasks():
ws.send_str(json.dumps(["taskUpdate", event.to_dict()]))
self.connections.append(ws)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
method_name, args = json.loads(msg.data)
if not (await self._check_acl(method_name, client, operator, admin)):
ws.send_str(json.dumps(["accessDenied", method_name]))
continue
method = getattr(self, "_ws_" + method_name, None)
if method:
ret = await method(ws, *args)
if ret is not None:
ws.send_str(ret)
else:
LOG.info("UnkNown websocket method %s", method_name)
else:
LOG.debug("websocket msg %s %s", msg.type, msg)
self._ws_user_map.pop(ws)
self.connections.remove(ws)
self._disconnect_ws_console(ws)
return ws
def websocket_handler(request):
ws = web.WebSocketResponse()
request.app[WS].append(ws)
await ws.prepare(request)
ws_type_lookup = {k.value: v for v, k in aiohttp.MsgType.__members__.items()}
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
try:
data = json.loads(msg.data)
except json.JSONDecodeError as e:
logger.error('JSON decode error: %s', str(e))
else:
command = data['command']
if command == 'hello':
if 'http://livereload.com/protocols/official-7' not in data['protocols']:
logger.error('live reload protocol 7 not supported by client %s', msg.data)
ws.close()
else:
handshake = {
'command': 'hello',
'protocols': [
'http://livereload.com/protocols/official-7',
],
'serverName': 'livereload-aiohttp',
}
ws.send_str(json.dumps(handshake))
elif command == 'info':
logger.info('browser connected at %s', data['url'])
logger.debug('browser plugins: %s', data['plugins'])
else:
logger.error('UnkNown ws message %s', msg.data)
elif msg.tp == aiohttp.MsgType.error:
logger.error('ws connection closed with exception %s', ws.exception())
else:
logger.error('unkNown websocket message type %s,data: %s', ws_type_lookup[msg.tp], msg.data)
# Todo gracefully close websocket connections on app shutdown
logger.debug('browser disconnected')
request.app[WS].remove(ws)
return ws
def _ws_handler(self, request):
"""
Handle websocket connections.
This includes:
* new connections
* closed connections
* messages
"""
websocket = web.WebSocketResponse()
yield from websocket.prepare(request)
session = yield from get_session(request)
if session.new:
logger.debug('websocket: not logged in')
websocket.send_str(json.dumps({'status': 401, 'text': "Unauthorized"}))
websocket.close()
return websocket
self.websockets.append(websocket)
for func in self.on_ws_connect:
yield from func(websocket, session)
while True:
msg = yield from websocket.receive()
if msg.type == WSMsgType.CLOSE or msg.type == WSMsgType.CLOSED:
logger.debug('websocket closed')
break
logger.debug("websocket got: %s", msg)
if msg.type == WSMsgType.TEXT:
for func in self.on_ws_message:
yield from func(websocket, session, msg.data)
elif msg.type == WSMsgType.ERROR:
logger.debug('websocket closed with exception %s', websocket.exception())
yield from asyncio.sleep(0.1)
self.websockets.remove(websocket)
for func in self.on_ws_disconnect:
yield from func(session)
return websocket
### JRPC protocol ###
def websocket_handler(request):
ws = web.WebSocketResponse(timeout=0.01)
url = None
await ws.prepare(request)
async for msg in ws:
if msg.tp == WSMsgType.TEXT:
try:
data = json.loads(msg.data)
except json.JSONDecodeError as e:
aux_logger.error('JSON decode error: %s', str(e))
else:
command = data['command']
if command == 'hello':
if 'http://livereload.com/protocols/official-7' not in data['protocols']:
aux_logger.error('live reload protocol 7 not supported by client %s',
}
ws.send_str(json.dumps(handshake))
elif command == 'info':
aux_logger.debug('browser connected: %s', data)
url = '/' + data['url'].split('/', 3)[-1]
request.app[WS].append((ws, url))
else:
aux_logger.error('UnkNown ws message %s', msg.data)
elif msg.tp == WSMsgType.ERROR:
aux_logger.error('ws connection closed with exception %s', ws.exception())
else:
aux_logger.error('unkNown websocket message type %s, WS_TYPE_LOOKUP[msg.tp], msg.data)
if url is None:
aux_logger.warning('browser disconnected,appears no websocket connection was made')
else:
aux_logger.debug('browser disconnected')
request.app[WS].remove((ws, url))
return ws
def _handle(self, request: BaseRequest):
ws = web.WebSocketResponse()
await ws.prepare(request)
ws.request = request
self.connections.add(ws)
wsid = ws.headers['Sec-Websocket-Accept']
logger.debug('websocket connected: %r,%d user(s) online' % (wsid, len(self.connections)))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'ws.close':
await ws.close()
else:
try:
# request id,command,data
rid, command, data = json.loads(msg.data)
except json.decoder.JSONDecodeError:
logger.error('websocket command parse Failed %s: %r' % (msg.data, wsid))
continue
def send_json_wrap(rid):
async def send_json(code, data=NotImplemented):
if data is NotImplemented:
data = RETCODE.txt_cn.get(code)
val = {'code': code, 'data': data}
logger.info('websocket reply %r - %s: %r' % (command, val, wsid))
await ws.send_json([rid, val])
return send_json
send_json = send_json_wrap(rid)
if command in self._on_message:
logger.info('websocket command %r - %s: %r' % (command, data, wsid))
for i in self._on_message[command]:
ret = await i(ws, send_json, data)
if ret is not None:
await send_json(*ret)
else:
logger.info('websocket command not found %s: %r' % (command, wsid))
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.debug('websocket connection closed with exception %s: %r' % (ws.exception(), wsid))
break
self.connections.remove(ws)
await self.on_close(ws)
logger.debug('websocket connection closed: %r, len(self.connections)))
return ws
def websocket_handler(request):
ws = web.WebSocketResponse()
url = None
await ws.prepare(request)
ws_type_lookup = {k.value: v for v, k in MsgType.__members__.items()}
async for msg in ws:
if msg.tp == MsgType.text:
try:
data = json.loads(msg.data)
except json.JSONDecodeError as e:
aux_logger.error('JSON decode error: %s', data)
url = data['url'].split('/', msg.data)
elif msg.tp == MsgType.error:
aux_logger.error('ws connection closed with exception %s', ws.exception())
else:
aux_logger.error('unkNown websocket message type %s, msg.data)
aux_logger.debug('browser disconnected')
if url:
request.app[WS].remove((ws, url))
return ws