国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python3實現ftp服務功能(服務端 For Linux)

2020-02-23 04:29:02
字體:
來源:轉載
供稿:網友

本文實例為大家分享了python3實現ftp服務功能的具體代碼,供大家參考,具體內容如下

功能介紹:

可執行的命令:

ls
pwd
cd
put
rm
get
mkdir

1、用戶加密認證

2、允許多用戶同時登陸

3、每個用戶有自己的家目錄,且只可以訪問自己的家目錄

4、運行在自己家目錄下隨意切換目錄

5、允許上傳下載文件,且文件一致

6、傳輸過程中顯示進度條
server main 代碼:

# Author by Andy# _*_ coding:utf-8 _*_import os, sys, json, hashlib, socketserver, timebase_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))sys.path.append(base_dir)from conf import userdb_setclass Ftp_server(socketserver.BaseRequestHandler): user_home_dir = '' def auth(self, *args):  '''驗證用戶名及密碼'''  cmd_dic = args[0]  username = cmd_dic["username"]  password = cmd_dic["password"]  f = open(userdb_set.userdb_set(), 'r')  user_info = json.load(f)  if username in user_info.keys():   if password == user_info[username]:    self.request.send('0'.encode())    os.chdir('/home/%s' % username)    self.user_home_dir = os.popen('pwd').read().strip()    data = "%s login successed" % username    self.loging(data)   else:    self.request.send('1'.encode())    data = "%s login failed" % username    self.loging(data)    f.close  else:   self.request.send('1'.encode())   data = "%s login failed" % username   self.loging(data)   f.close   ########################################## def get(self, *args):  '''給客戶端傳輸文件'''  request_code = {   '0': 'file is ready to get',   '1': 'file not found!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic["filename"]  if os.path.isfile(filename):   self.request.send('0'.encode('utf-8')) # 確認文件存在   self.request.recv(1024)   self.request.send(str(os.stat(filename).st_size).encode('utf-8'))   self.request.recv(1024)   m = hashlib.md5()   f = open(filename, 'rb')   for line in f:    m.update(line)    self.request.send(line)   self.request.send(m.hexdigest().encode('utf-8'))   print('From server:Md5 value has been sended!')   f.close()  else:   self.request.send('1'.encode('utf-8'))   ########################################### def cd(self, *args):  '''執行cd命令'''  user_current_dir = os.popen('pwd').read().strip()  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  path = cmd_dic['path']  if path.startswith('/'):   if self.user_home_dir in path:    os.chdir(path)    new_dir = os.popen('pwd').read()    user_current_dir = new_dir    self.request.send('Change dir successfully!'.encode("utf-8"))    data = 'Change dir successfully!'    self.loging(data)   elif os.path.exists(path):    self.request.send('Permission Denied!'.encode("utf-8"))    data = 'Permission Denied!'    self.loging(data)   else:    self.request.send('Directory not found!'.encode("utf-8"))    data = 'Directory not found!'    self.loging(data)  elif os.path.exists(path):   os.chdir(path)   new_dir = os.popen('pwd').read().strip()   if self.user_home_dir in new_dir:    self.request.send('Change dir successfully!'.encode("utf-8"))    user_current_dir = new_dir    data = 'Change dir successfully!'    self.loging(data)   else:    os.chdir(user_current_dir)    self.request.send('Permission Denied!'.encode("utf-8"))    data = 'Permission Denied!'    self.loging(data)  else:   self.request.send('Directory not found!'.encode("utf-8"))   data = 'Directory not found!'   self.loging(data)   ########################################### def rm(self, *args):  request_code = {   '0': 'file exist,and Please confirm whether to rm',   '1': 'file not found!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic['filename']  if os.path.exists(filename):   self.request.send('0'.encode("utf-8")) # 確認文件存在   client_response = self.request.recv(1024).decode()   if client_response == '0':    os.popen('rm -rf %s' % filename)    self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))    self.loging('File %s has been deleted!' % filename)   else:    self.request.send(('File %s not deleted!' % filename).encode("utf-8"))    self.loging('File %s not deleted!' % filename)  else:   self.request.send('1'.encode("utf-8"))   ######################################## def pwd(self, *args):  '''執行pwd命令'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  server_response = os.popen('pwd').read().strip().encode("utf-8")  self.request.send(server_response) ############################################# def ls(self, *args):  '''執行ls命名'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  path = cmd_dic['path']  cmd = 'ls -l %s' % path  server_response = os.popen(cmd).read().encode("utf-8")  self.request.send(server_response) ############################################ def put(self, *args):  '''接收客戶端文件'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic["filename"]  filesize = cmd_dic["size"]  if os.path.isfile(filename):   f = open(filename + '.new', 'wb')  else:   f = open(filename, 'wb')  request_code = {   '200': 'Ready to recceive data!',   '210': 'Not ready to received data!'  }  self.request.send('200'.encode())  receive_size = 0  while True:   if receive_size < filesize:    data = self.request.recv(1024)    f.write(data)    receive_size += len(data)   else:    data = "File %s has been uploaded successfully!" % filename    self.loging(data)    print(data)    break    ################################################ def mkdir(self, *args):  request_code = {   '0': 'Directory has been made!',   '1': 'Directory is aleady exist!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  dir_name = cmd_dic['dir_name']  if os.path.exists(dir_name):   self.request.send('1'.encode("utf-8"))  else:   os.popen('mkdir %s' % dir_name)   self.request.send('0'.encode("utf-8"))   #############################################  def loging(self, data):  '''日志記錄'''  localtime = time.asctime(time.localtime(time.time()))  log_file = '/root/ftp/ftpserver/log/server.log'  with open(log_file, 'a', encoding='utf-8') as f:   f.write('%s-->' % localtime + data + '/n')   ############################################## def handle(self):  # print("您本次訪問使用的IP為:%s" %self.client_address[0])  # localtime = time.asctime( time.localtime(time.time()))  # print(localtime)  while True:   try:    self.data = self.request.recv(1024).decode() #    # print(self.data)    cmd_dic = json.loads(self.data)    action = cmd_dic["action"]    # print("用戶請求%s"%action)    if hasattr(self, action):     func = getattr(self, action)     func(cmd_dic)   except Exception as e:    self.loging(str(e))    breakdef run(): HOST, PORT = '0.0.0.0', 6969 print("The server is started,and listenning at port 6969") server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server) server.serve_forever()if __name__ == '__main__': run()            
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 徐汇区| 左贡县| 手游| 襄汾县| 丰镇市| 于田县| 宝清县| 绵竹市| 巴彦淖尔市| 桑日县| 舒兰市| 岳阳县| 交城县| 合水县| 萝北县| 左贡县| 营山县| 昂仁县| 安福县| 星子县| 建昌县| 建始县| 清苑县| 定边县| 花莲县| 两当县| 龙江县| 泰州市| 定安县| 台东市| 乌兰县| 田林县| 仪陇县| 汤原县| 三门县| 秀山| 望城县| 枝江市| 高台县| 永康市| 舒城县|