0.FastApi+WebSocket框架

XActivity

from fastapi import FastAPI, WebSocket,WebSocketdisconnect
from starlette.middleware.cors import CORSMiddleware
origins = ["*", ]
class XActivity:
    def __init__(self):
        self.app = FastAPI()
        self.app.add_middleware(
            CORSMiddleware,
            allow_origins=origins,
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"])
        self.init_base()
        self.init_utils()
        self.init_fragment()

    def init_fragment(self):
        pass

    def init_base(self):
        pass

    def init_utils(self):
        pass

XFragment

class XFragment:
    def __init__(self,app,name):
        self.name = name
        self.run(app)

    def get_name(self, tag):
        mind = "{}/{}".format(self.name, tag)
        if mind[0] != "/":
            mind = "/" + mind
        return mind

    def run(self, app):
        raise Exception

XMaster

from fastapi import FastAPI, WebSocket, WebSocketdisconnect
from redis import Redis
import asyncio, time, gc, sys, gc
from .XFragment import XFragment
from pydantic import BaseModel
from dataclasses import dataclass
from ..XUtils.XUtils import XUta


class LoginItem(BaseModel):
    user: str
    password: str


class R:
    def __init__(self, db):
        self.r = Redis(db=db, decode_responses=True)
        self.p = self.r.pipeline()
        self.r.flushdb()


class R0(R):
    def __init__(self):
        R.__init__(self, 5)


class R1(R):
    def __init__(self):
        R.__init__(self, 4)

    def iscrea(self, aka, dt, cand):
        data = MasterItem(aka, dt, cand).show()
        self.p.multi()
        self.p.hmset(aka, data)
        self.p.expire(aka, 60)
        self.p.execute()
        return SlaveItem(aka, dt).show()

    def iscand(self, aka):
        return self.r.hget(aka, "cand")


class Mind:
    def show(self):
        return self.__dict__


@dataclass()
class SlaveItem(Mind):
    aka: str
    dt: str


@dataclass()
class MasterItem(SlaveItem):
    cand: str


@dataclass()
class AntItem(Mind):
    cand: str
    dt: str
    por: str


class XMaster(XFragment, XUta):
    def __init__(self, app: FastAPI):
        XFragment.__init__(self, app, "main")
        self.r0 = R0()
        self.r1 = R1()

    def run(self, app):
        self._websocket(app)
        self._login(app)

    def _login(self, app):
        @app.post(self.get_name("login"))
        async def login(item: LoginItem):
            return self.login(item.user, item.password)

    def add(self, func, id, seconds=1):
        self.s0.add_job(func, 'interval', seconds=seconds, id=id)

    def login(self, user, password):
        raise Exception

    def _websocket(self, app: FastAPI):
        raise Exception

    def keeplive(self, cand, por):
        return self.r0.r.hget(cand, "por") == por

    def crea(self, cand, aka):
        if self.r0.r.exists(cand) == 1:
            self.kill(cand)
        self.r1.r.delete(aka)
        por = self.get_uid()
        data = AntItem(cand, self.get_dt(), por).show()
        self.r0.p.multi()
        self.r0.p.hmset(cand, data)
        self.r0.p.expire(cand, 60)
        self.r0.p.execute()
        return por

XSlave

from fastapi import WebSocket,WebSocketdisconnect
import asyncio,os,sys
from ..XUtils.XUtils import XUta
class XSlave(XUta):
    def __init__(self,websocket: WebSocket,aka,cand,por):
        XUta.__init__(self)
        self.websocket = websocket
        self.cand = cand
        self.aka = aka
        self.por = por
        self.keep = True

    def shark(self,msg):
        raise Exception

    def startlight(self):
        pass

    def shark(self,msg):
        print(msg)

    def keeplive(self,cand,por):
        raise Exception

XUtils

import time, datetime, os, shutil
from uuid import uuid4
class XUta:

    def get_dt(self, t=True):
        dt = datetime.datetime.Now().replace(microsecond=0)
        if t:
            dt = str(dt)
        return dt

    def get_uid(self):
        return uuid4().hex

    def make_dt(self, date):
        return datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%s")


class XUtb:

    def new_bag(self, path):
        if os.path.exists(path) == False:
            os.makedirs(path)
        return path

    def new_file(self, old_path, new_path, cur_path):
        if os.path.exists(old_path):
            os.remove(old_path)
        shutil.copy(cur_path, new_path)

    def __ispath(self, path, na):

        ge = os.path.exists(path)
        if ge:
            return path
        else:
            if na:
                raise Exception

    def hard_path(self, path):
        self.__ispath(path, True)

    def simp_path(self, path):
        return self.__ispath(path, False)


class XUtils(XUta, XUtb):
    def __init__(self):
        XUta.__init__(self)
        XUtb.__init__(self)

相关文章

显卡天梯图2024最新版,显卡是电脑进行图形处理的重要设备,...
初始化电脑时出现问题怎么办,可以使用win系统的安装介质,连...
todesk远程开机怎么设置,两台电脑要在同一局域网内,然后需...
油猴谷歌插件怎么安装,可以通过谷歌应用商店进行安装,需要...
虚拟内存这个名词想必很多人都听说过,我们在使用电脑的时候...