Python异步:如何将生产者/消费者模式链接到看门狗进程?

问题描述

我有一个无限的python-watchdog进程(用supervisor守护),当在文件夹中创建文件时会触发新事件。

另一端是一个异步队列,只有3个使用者,等待创建的文件的新消息(使用者然后将文件上传到服务器)。

下面的代码可以工作,但是是连续的,我只是不明白为什么(尽管我阅读了很多有关asyncio的文档)。

main.py(守护程序):

paths_observer = PathsObserver()

if __name__ == "__main__":
    paths_observer.start()

    try:
        while paths_observer.is_alive():
            paths_observer.join(1)
    except KeyboardInterrupt:
        paths_observer.stop()

    paths_observer.join()

pathobserver.py(简体,用于记录要观看的文件夹):

from watchdog.observers import Observer

class PathsObserver(Observer):
    def observe_folder(self,folder_path: str) -> None:
        event_handler = DataFileHandler(path=folder_path)
        watch = self.schedule(event_handler,folder_path,recursive=True)

filehandler.py:注意调度程序的导入

from watchdog.events import FileCreatedEvent,FileSystemEventHandler

from .scheduler import scheduler

class DataFileHandler(FileSystemEventHandler):
    def on_created(self,event):        
        scheduler.prepare_and_upload(event.src_path)

scheduler.py:

MAX_TASKS = 3

class UploadScheduler(object):
    def __init__(self):
        self._loop = asyncio.get_event_loop()
        self._queue = asyncio.Queue(0,loop=self._loop)
        self._consumers = [asyncio.ensure_future(self._consumer(self._queue)) for _ in range(MAX_TASKS)]

    async def _consumer(self,queue):
        while True:
            file_path = await queue.get()
            await <do some stuff with file_path>
            queue.task_done()

    async def _producer(self,file_path):
        await self._queue.put(file_path)

    def prepare_and_upload(self,file_path):
        self._loop.run_until_complete(self._producer(file_path))


scheduler = UploadScheduler()

如果我删除self._loop.run_until_complete,则消费者不会获取任务,并且有一条消息表明producer尚未等待。如果我将self._queue.put_no_wait(file_path)放在非异步函数中,则消费者作业也不会开始。

我做错了什么?

我所期望的:创建多个文件时,将调用一堆新的生产者。而且由于消费者需要花费一些时间来上传所有内容,因此他们会慢慢赶上。整个过程保持不变,并等待新文件上传

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)