使用 asyncio 处理队列仅在所有项目使用 python 中的 asyncio 放入队列后触发

问题描述

我正在尝试在 python 中创建一些代码,将来自生成器的数据(当前是一个简单的计数循环,但在某些时候将是传感器数据)并将其放入队列中。一旦进入队列,我想从中提取数据并通过 TCP 连接发送它。这是使用 asyncio 的好时机,但我做错了。

目前,脚本将处理所有数字并且不返回任何内容。理想情况下,我想确保队列中有一些东西,所以它永远不会清空,并且每次都发送一定数量的数据,比如 5 个数字。我怎样才能做到这一点?

import asyncio
import random

class responder():
    
    def __init__(self,parent=None):    
        super().__init__()



    async def produce(self,queue,n):
        for x in range(n):
            # produce an item
            print('producing {}/{}'.format(x,n))
            # simulate I/O operation using sleep
            await asyncio.sleep(random.random())
            item = str(x)
            # put the item in the queue
            await queue.put(item)

    async def consume(self,queue):
        while True:
            # wait for an item from the producer
            item = await queue.get()

            # process the item
            print('consuming {}...'.format(item))
            # simulate I/O operation using sleep
            await asyncio.sleep(random.random())

            # Notify the queue that the item has been processed
            queue.task_done()

    async def run(self,n):
        queue = asyncio.Queue()
        # schedule the consumer
        self.consumer = asyncio.ensure_future(self.consume(queue))
        # run the producer and wait for completion
        await self.produce(queue,n)
        # wait until the consumer has processed all items
        await queue.join()
        # the consumer is still awaiting for an item,cancel it
        self.consumer.cancel()

    async def handle_echo(self,reader,writer):
        data = await reader.read(100)
        message = data.decode()
        addr = writer.get_extra_info('peername')
        print("Received %r from %r" % (message,addr))
        if (message == 'START_RUN'):
            data = await self.run(10) 
            print("Send: %i" % data)
            writer.write(data)
            await writer.drain()
        else: 
            print("Send: %r" % message)
            writer.write(message)
            await writer.drain()

        print("Close the client socket")
        writer.close()

    def launch_server(self):
        self.loop = asyncio.get_event_loop()
        self.coro = asyncio.start_server(self.handle_echo,'127.0.0.1',7780,loop=self.loop)
        self.server = self.loop.run_until_complete(self.coro)

        # Serve requests until Ctrl+C is pressed
        print('Serving on {}'.format(self.server.sockets[0].getsockname()))
        try:
            self.loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            # Close the server
            self.server.close()
            self.loop.run_until_complete(self.server.wait_closed())
            self.loop.close()

def main():
    server = responder()
    server.launch_server()

if __name__ == '__main__':
    main()

代码生成数字流,但在继续之前它会遍历整个列表。此外,我永远不会得到回报。

我的客户端代码(永远不会得到任何东西)

import asyncio


async def capture_stream(reader):

    while not reader.at_eof:
        data = await reader.read(100)
        print( f'{who} received {len(data)} bytes' )


async def tcp_echo_client(message,loop):
    reader,writer = await asyncio.open_connection('127.0.0.1',loop=loop)
    
    print('Send: %r' % message)
    writer.write(message.encode())

    if (message == "START_RUN"):
        data = await reader.read(100)
        print('Received: %r' % data.decode())
    else:
        collect_data = asyncio.create_task(capture_stream)
        data = await collect_data


    print('Close the socket')
    writer.close()
    
message = 'START_RUN'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message,loop))
loop.close()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...