一个基于selectors开发ftp并发上传下载文件小程序

理论基础:

         IO多路复用之selectors模块


服务端代码:


import os

import socket

import selectors

import json

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

buffer_size = 1024

STATUS_CODE = {

    700: '路径存在',

    701: '路径不存在,请重新新输入',

    702: "文件名已存在,请重命名文件",

    800: "文件已存在,但是大小不够,是否继续上传",

    801: "文件已存在",

    802: "准备发送文件",

    803: "准备接受数据",

    804: "文件不存在",

}

class SelectFtpServer:

    def __init__(self):

        self.dic = {}

        self.sel = selectors.DefaultSelector()

        self.create_socket()

        self.handle()

    def create_socket(self):

        server = socket.socket()

        server.bind(("127.0.0.1",8885))

        server.listen(5)

        server.setblocking(False)

        self.sel.register(server,selectors.EVENT_READ,self.accept)

        print("服务端开启,等待用户连接...")

    def handle(self):

        while True:

            events = self.sel.select()

            for key,mask in events:

                callback = key.data

                callback(key.fileobj,mask)

    def accept(self,sock,mask):

        conn,addr = sock.accept()

        print("{}连接成功".format(addr))

        conn.setblocking(False)

        self.sel.register(conn,self.read)

        self.dic[conn] = {}

        # print(self.dic)

    def read(self,conn,mask):

        try:

            if not self.dic[conn]:

                data = json.loads(conn.recv(buffer_size).decode("utf-8"))

                if data:

                    cmd = data["action"]

                    file_name = data["file_name"]

                    file_size = data['file_size']

                    self.dic[conn] = {"action": cmd,"file_name": file_name,"file_size": file_size,"has_recv": 0}

                    if cmd == 'put':

                        data = {

                            "status_code": 802

                        }

                        conn.send(json.dumps(data).encode())

                    elif cmd == 'get':

                        file_path = os.path.join(BASE_DIR,"upload",file_name)

                        # print(file_path)

                        if os.path.exists(file_path):

                            file_size = os.path.getsize(file_path)

                            data = {

                                "status_code": 803,

                                "file_size": file_size

                            }

                        else:

                            data = {

                                "status_code": 804,

                            }

                        conn.send(json.dumps(data).encode())

                    else:

                        pass

                else:

                    self.sel.unregister(conn)

                    conn.close()

            else:

                cmd = self.dic[conn].get("action",None)

                if cmd:

                    if hasattr(self,cmd):

                        func = getattr(self,cmd)

                        func(conn)

                    else:

                        print("错误的命令!")

                        conn.close()

                else:

                    print("错误的命令!")

                    conn.close()

        except ConnectionResetError as e:

            print('error',e)

            self.sel.unregister(conn)

            conn.close()

    def put(self,conn):

        file_size = self.dic[conn]['file_size']

        path = os.path.join(BASE_DIR,self.dic[conn]['file_name'])

        recv_data = conn.recv(buffer_size)

        self.dic[conn]['has_recv'] += len(recv_data)

        with open(path,'ab') as f:

            f.write(recv_data)

            if self.dic[conn]['has_recv'] >= file_size:

                print("%s上传完毕!" % self.dic[conn]['file_name'])

                print(self.dic[conn])

                self.dic[conn] = {}

    def get(self,conn):

        file_name = self.dic[conn]['file_name']

        path = os.path.join(BASE_DIR,file_name)

        if json.loads(conn.recv(buffer_size).decode("utf-8"))["status_code"] == 802:

            with open(path,'rb') as f:

                for line in f:

                    conn.send(line)

            self.dic[conn] = {}

            print('文件下载完毕!')

            # print(self.dic[conn])

if __name__ == '__main__':

    SelectFtpServer()

客户端代码:


import socket

import os,sys

import json

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

buffer_size = 1024

class SelectFtpClient:

    def __init__(self):

        self.args = sys.argv

        if len(self.args) > 1:

            self.port = (self.args[1],int(self.args[2]))

        else:

            self.port = ("127.0.0.1",8885)

        self.create_socket()

        self.command()

    def create_socket(self):

        try:

            self.sk = socket.socket()

            self.sk.connect(self.port)

            print('连接FTP服务器成功!')

        except Exception as e:

            print("error: ",e)

    def command(self):

        while True:

            cmd = input('>>>').strip()

            if cmd == 'exit()':

                print("结束运行")

                break

            try:

                cmd,file = cmd.split()

                if hasattr(self,cmd):

                    func = getattr(self,cmd)

                    func(cmd,file)

                else:

                    print('调用错误!')

            except ValueError:

                print("请正确输入命令")

    def put(self,cmd,file):

        if os.path.isfile(file):

            file_name = os.path.basename(file)

            file_size = os.path.getsize(file)

            data = {

                "action": cmd,

                "file_name": file_name,

                "file_size": file_size

            }

            self.sk.send(json.dumps(data).encode())

            recv_data = json.loads(self.sk.recv(buffer_size).decode("utf-8"))

            print('recvStatus',recv_data)

            has_send = 0

            if recv_data['status_code'] == 802:

                with open(file,'rb') as f:

                    while file_size > has_send:

                        contant = f.read(1024)

                        self.sk.send(contant)

                        has_send += len(contant)

                        self.progress_bar(has_send,file_size)

                print('\n%s文件上传完毕' % (file_name,))

        else:

            print('文件不存在')

    def get(self,file_name):

        data = {

            "action": cmd,

            "file_name": file_name,

            "file_size": 0

        }

        self.sk.send(json.dumps(data).encode())

        rec_data = json.loads(self.sk.recv(buffer_size).decode("utf-8"))

        if rec_data["status_code"] == 803:

            data = {

                "status_code": 802

            }

            self.sk.send(json.dumps(data).encode("utf-8"))

            path = os.path.join(BASE_DIR,file_name)

            has_recv = 0

            with open(path,'wb') as f_write:

                while has_recv < rec_data['file_size']:

                    content = self.sk.recv(buffer_size)

                    has_recv += len(content)

                    f_write.write(content)

                    self.progress_bar(has_recv,rec_data['file_size'])

                print('\nfile %s 下载完成!' % file_name)

        else:

            print("文件不存在!")

    def progress_bar(self,f_size,total):

        percent = round(f_size / total * 100)

        str = '*' * (percent // 2) + ' ' * ((100 - percent) // 2)

        print('\r' + str + ' {}%'.format(percent),end='',flush=True,)

if __name__ == '__main__':

    SelectFtpClient()


相关文章

使用OpenCV实现视频去抖 整体步骤: 设置输入输出视频 寻找帧...
前言 对中文标题使用余弦相似度算法和编辑距离相似度分析进行...
前言 之前尝试写过一个爬虫,那时对网页请求还不够熟练,用的...
前言 本文使用Python实现了PCA算法,并使用ORL人脸数据集进行...
前言 使用opencv对图像进行操作,要求:(1)定位银行票据的...
天气预报API 功能 从中国天气网抓取数据返回1-7天的天气数据...