问题描述
我正在尝试在 python 中创建一些代码,将来自生成器的数据(当前是一个简单的计数循环,但在某些时候将是传感器数据)并将其放入队列中。一旦进入队列,我想从中提取数据并通过 TCP 连接发送它。这是使用 asyncio 的好时机,但我做错了。
目前,脚本将处理所有数字并且不返回任何内容。理想情况下,我想确保队列中有一些东西,所以它永远不会清空,并且每次都发送一定数量的数据,比如 5 个数字。我怎样才能做到这一点?
import asyncio
import random
class responder():
def __init__(self,parent=None):
super().__init__()
async def produce(self,queue,n):
for x in range(n):
# produce an item
print('producing {}/{}'.format(x,n))
# simulate I/O operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
await queue.put(item)
async def consume(self,queue):
while True:
# wait for an item from the producer
item = await queue.get()
# process the item
print('consuming {}...'.format(item))
# simulate I/O operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
async def run(self,n):
queue = asyncio.Queue()
# schedule the consumer
self.consumer = asyncio.ensure_future(self.consume(queue))
# run the producer and wait for completion
await self.produce(queue,n)
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item,cancel it
self.consumer.cancel()
async def handle_echo(self,reader,writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print("Received %r from %r" % (message,addr))
if (message == 'START_RUN'):
data = await self.run(10)
print("Send: %i" % data)
writer.write(data)
await writer.drain()
else:
print("Send: %r" % message)
writer.write(message)
await writer.drain()
print("Close the client socket")
writer.close()
def launch_server(self):
self.loop = asyncio.get_event_loop()
self.coro = asyncio.start_server(self.handle_echo,'127.0.0.1',7780,loop=self.loop)
self.server = self.loop.run_until_complete(self.coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(self.server.sockets[0].getsockname()))
try:
self.loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Close the server
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.loop.close()
def main():
server = responder()
server.launch_server()
if __name__ == '__main__':
main()
代码生成数字流,但在继续之前它会遍历整个列表。此外,我永远不会得到回报。
我的客户端代码(永远不会得到任何东西)
import asyncio
async def capture_stream(reader):
while not reader.at_eof:
data = await reader.read(100)
print( f'{who} received {len(data)} bytes' )
async def tcp_echo_client(message,loop):
reader,writer = await asyncio.open_connection('127.0.0.1',loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
if (message == "START_RUN"):
data = await reader.read(100)
print('Received: %r' % data.decode())
else:
collect_data = asyncio.create_task(capture_stream)
data = await collect_data
print('Close the socket')
writer.close()
message = 'START_RUN'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message,loop))
loop.close()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)