如何处理传入的数据以便以后通过键合并?

问题描述

我编写此代码是为了使用 asyncio 制作非阻塞管理器以及管道操作,我主要关注的是捕获接收到的项目生产者,以及接收操作何时完成。如果键匹配,我想一起返回并合并,但是,我怀疑我应该在哪里加入最终生产者或消费者的数据,因为当前工作流程如下

  1. 抓取所有数据库(多个客户端)(模拟)

  2. 推送到管理器(代理级服务器),多个客户端将其数据发送到管理器

  3. 根据传入的数据将多个数据源合并到一个列表中,没有数据库操作,例如 {"ID-2002-0201": {"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}} > 可能是生产者

  4. 使用 get_or_create (如果没有包含该数据的记录,则检查数据库,否则创建它)> 消费者

  5. 创建大量数据(当数据源从 2 增加到 100+ 时,可以将数据分成更小的块以进行扩展)> 消费者

server.py

# #!/usr/bin/env python3
import asyncio
import logging
import random
from pipeline_async import Pipeline

class A:
    def __init__(self):
        pass

    def run(self):
        return {"ID-2002-0201":{"id":"ID-2002-0201","html_url":"xxxxxxxxxxxx"}}

class B:
    def __init__(self):
        pass

    def run(self):
        return {"ID-2002-0202":{"id":"ID-2002-0202","html_url":"xxxxxxxxxxxx"}}

class Manager:

    async def producer(self,pipeline,data_sources):
        """Pretend we're getting a number from the network."""
        for data_stream in data_sources:
            await pipeline.set_message(data_stream.run(),"Producer")
            logging.info("Producer got message: %s",data_stream)

    async def consumer(self,pipeline):
        """ Pretend we're saving a number in the database. """
        while True:
            # wait for an item from the Producer
            message = await pipeline.get_message("Consumer")
            # process the msg
            logging.info(
                "Consumer storing message: %s",message
            )
            # simulate I/O operation using sleep
            await asyncio.sleep(random.random())
            pipeline.task_done()

    async def start(self):
        pipeline = Pipeline()
        data_sources = [A(),B()]
        # schedule the consumer
        consume = asyncio.ensure_future(self.consumer(pipeline))
        # run the producer and wait for completion
        await self.producer(pipeline,data_sources)
        # wait until the consumer has processed all items
        await pipeline.join()
        # the consumer is still awaiting for an item,cancel it
        consume.cancel()
        logging.info("Successfully shutdown the service.")

if __name__ == '__main__':
    asyncio.run(Manager().start())

管道.py

class Pipeline(asyncio.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    async def get_message(self,name):
        logging.debug("%s:about to get from queue",name)
        value = await self.get()
        logging.debug("%s:got %s from queue",name,value)
        return value

    async def set_message(self,value,name):
        logging.debug("%s:about to add %s to queue",value)
        await self.put(value)
        print(name,value)
        logging.debug("%s:added %s to queue",value)

如果我错过了一些案例,我将不胜感激

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)