问题描述
我正在尝试使用asyncio和Unix域套接字在Python中构建玩具内存Redis服务器。
我的最小示例仅针对每个请求返回值baz
:
import asyncio
class RedisServer:
def __init__(self):
self.server_address = "/tmp/redis.sock"
async def handle_req(self,reader,writer):
await reader.readline()
writer.write(b"$3\r\nbaz\r\n")
await writer.drain()
writer.close()
await writer.wait_closed()
async def main(self):
server = await asyncio.start_unix_server(self.handle_req,self.server_address)
async with server:
await server.serve_forever()
def run(self):
asyncio.run(self.main())
RedisServer().run()
当我使用以下脚本用the redis
client library测试两个顺序的客户端请求时,它会起作用:
import time
import redis
r = redis.Redis(unix_socket_path="/tmp/redis.sock")
r.get("foo")
time.sleep(1)
r.get("bar")
但是,如果我删除了time.sleep(1)
,有时它会起作用,并且有时第二个请求失败会失败,其中之一:
Traceback (most recent call last):
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py",line 706,in send_packed_command
sendall(self._sock,item)
File "/tmp/env/lib/python3.8/site-packages/redis/_compat.py",line 9,in sendall
return sock.sendall(*args,**kwargs)
brokenPipeError: [Errno 32] broken pipe
During handling of the above exception,another exception occurred:
Traceback (most recent call last):
File "test.py",in <module>
r.get("bar")
File "/tmp/env/lib/python3.8/site-packages/redis/client.py",line 1606,in get
return self.execute_command('GET',name)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py",line 900,in execute_command
conn.send_command(*args)
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py",line 725,in send_command
self.send_packed_command(self.pack_command(*args),File "/tmp/env/lib/python3.8/site-packages/redis/connection.py",line 717,in send_packed_command
raise ConnectionError("Error %s while writing to socket. %s." %
redis.exceptions.ConnectionError: Error 32 while writing to socket. broken pipe.
或者:
Traceback (most recent call last):
File "test.py",line 901,in execute_command
return self.parse_response(conn,command_name,**options)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py",line 915,in parse_response
response = connection.read_response()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py",line 739,in read_response
response = self._parser.read_response()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py",line 324,in read_response
raw = self._buffer.readline()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py",line 256,in readline
self._read_from_socket()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py",line 201,in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
似乎我的实现缺少客户端库期望的一些关键行为(可能是由于异步)。我想念什么?
解决方法
如果要在每次请求后关闭套接字,则需要使用write_eof()
,
在刷新缓冲的写数据之后,关闭流的写结束。
请参阅 docs.python.org -> write_eof
您的代码进行了稍微修改,如下所示:
async def handle_req(self,reader,writer):
await reader.readline()
writer.write(b"$3\r\nbaz\r\n")
await writer.drain()
writer.write_eof()
writer.close()
await writer.wait_closed()
通常,您不会在每次请求后关闭套接字。
以下示例仅用于说明目的,旨在表明不需要关闭套接字。当然,您总是会读一行,然后根据Redis协议解释数据。我们知道在这里发送了两个GET命令(每行5行,带有2个元素的数组的指示符,字符串的指示符,字符串值'GET',再次是字符串指示符和对应的值,即键)
async def handle_req(self,writer):
print("start")
for i in range(0,2):
for x in range(0,5):
print(await reader.readline())
writer.write(b"$3\r\nbaz\r\n")
await writer.drain()
writer.write_eof()
writer.close()
await writer.wait_closed()
在客户端上发送是这样完成的:
print(r.get("foo"))
print(r.get("bar"))
time.sleep(1)
最后一次睡眠是为了确保客户端不会立即退出。
控制台上的输出为:
start
b'*2\r\n'
b'$3\r\n'
b'GET\r\n'
b'$3\r\n'
b'foo\r\n'
b'*2\r\n'
b'$3\r\n'
b'GET\r\n'
b'$3\r\n'
b'bar\r\n'
请注意,start
仅输出一次,这表明我们可以处理多个请求而不必立即关闭套接字。