Python Watchdog - 检查文件是否已经存在

问题描述

我一直在与 Watchdogs 合作,但我遇到了一个问题,例如我意识到我是否有一个位于正在检查的文件夹中的文件等:./flags/start_2314.json 然后我想启动脚本,它不会检查文件夹是否已经存在任何文件,然后执行我有 on_created 的逻辑部分

*kill_*.json & *start_*.json contains:

{"store": "twitch","link": "https://www.twitch.tv/xqcow","id": "2710"}


# !/usr/bin/python3
# -*- coding: utf-8 -*-

import json
import os
import os.path
import re
import sys
import time

import watchdog.events
import watchdog.observers
from loguru import logger
from watchdog.events import PatternMatchingEventHandler

import lib.database as database
import lib.get_all_paths as path



def start_script(**get_info):
    logger.info(f'Starting -> Will add more')
    delete_from_database(name=get_info["name"],link=get_info["link"])
    return


# -------------------------------------------------------------------------
# Remove the url from database
# -------------------------------------------------------------------------
def delete_from_database(**get_info):
    database.delete_manual_links(get_info["name"],get_info["link"])
    return


class DeleteAutomatic(PatternMatchingEventHandler):

    def __init__(self):
        self.observer = watchdog.observers.Observer()
        self.observer.schedule(
            self,path=path.get_path_flags(),recursive=True
        )
        super().__init__(
            [
                path.get_automatic_kill_flag(),#"*kill_*.json"
                path.get_automatic_restart_flag(),#"*start_*.json"
                path.get_manual_flag()              #"*new_manual_url*.json"
            ]
        )

    def __enter__(self):
        self.observer.start()

    def __exit__(self,exc_type,exc_val,exc_tb):
        self.observer.stop()
        self.observer.join()

    def on_created(self,event):
        while True:
            try:

                time.sleep(0.1)
                with open(event.src_path) as f:
                    payload = json.load(f)

                if "start" in event.src_path:
                    start_script(name=payload["name"],link=payload["link"],id=payload["id"])

                elif "kill" in event.src_path:
                    logger.info(f'Deleting -> Will add more')
                    delete_from_database(name=payload["name"],link=payload["link"])

                else:
                    logger.info(f'Detected New URL...')

                    for product in database.get_all_manual_links():
                        name = product["name"]
                        link = re.sub(r'\s+','',product["link"])

                        if not database.link_exists(name,link):
                            database.register_products(name,product_info)
                            start_script(name=name,link=link)

                        else:
                            logger.info(f'Product already monitored: {name} | Link: {link}')
                            delete_from_database(name=name,link=link)

                subprocess.run(split('pm2 save'))
                break

            except Exception as err:  # Need to be changed to something more specific
                logger.debug(f"Error -> {err}")
                time.sleep(1)

        while True:
            try:
                os.remove(event.src_path)
                break
            except Exception as err:  # Need to be changed to something more specific
                logger.debug(f"Error deleting-> {err}")
                time.sleep(1)


with DeleteAutomatic():
    while True:
        time.sleep(1)

我的问题是,在我开始文件夹中的文件包含 "*kill_*.json","*start_*.json" and "*new_manual_url*.json" 之前,我想使用与 def on_created(self,event): 中相同的逻辑,然后我会继续检查已创建的新文件

我想知道这是否可行?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)