问题描述
我有一个无限的python-watchdog进程(用supervisor
守护),当在文件夹中创建文件时会触发新事件。
另一端是一个异步队列,只有3个使用者,等待创建的文件的新消息(使用者然后将文件上传到服务器)。
下面的代码可以工作,但是是连续的,我只是不明白为什么(尽管我阅读了很多有关asyncio的文档)。
main.py(守护程序):
paths_observer = PathsObserver()
if __name__ == "__main__":
paths_observer.start()
try:
while paths_observer.is_alive():
paths_observer.join(1)
except KeyboardInterrupt:
paths_observer.stop()
paths_observer.join()
pathobserver.py(简体,用于记录要观看的文件夹):
from watchdog.observers import Observer
class PathsObserver(Observer):
def observe_folder(self,folder_path: str) -> None:
event_handler = DataFileHandler(path=folder_path)
watch = self.schedule(event_handler,folder_path,recursive=True)
filehandler.py:注意调度程序的导入
from watchdog.events import FileCreatedEvent,FileSystemEventHandler
from .scheduler import scheduler
class DataFileHandler(FileSystemEventHandler):
def on_created(self,event):
scheduler.prepare_and_upload(event.src_path)
scheduler.py:
MAX_TASKS = 3
class UploadScheduler(object):
def __init__(self):
self._loop = asyncio.get_event_loop()
self._queue = asyncio.Queue(0,loop=self._loop)
self._consumers = [asyncio.ensure_future(self._consumer(self._queue)) for _ in range(MAX_TASKS)]
async def _consumer(self,queue):
while True:
file_path = await queue.get()
await <do some stuff with file_path>
queue.task_done()
async def _producer(self,file_path):
await self._queue.put(file_path)
def prepare_and_upload(self,file_path):
self._loop.run_until_complete(self._producer(file_path))
scheduler = UploadScheduler()
如果我删除了self._loop.run_until_complete
,则消费者不会获取任务,并且有一条消息表明producer
尚未等待。如果我将self._queue.put_no_wait(file_path)
放在非异步函数中,则消费者作业也不会开始。
我做错了什么?
我所期望的:创建多个文件时,将调用一堆新的生产者。而且由于消费者需要花费一些时间来上传所有内容,因此他们会慢慢赶上。整个过程保持不变,并等待新文件上传。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)