国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python對于requests的封裝方法詳解

2020-02-16 00:28:03
字體:
供稿:網(wǎng)友

由于requests是http類接口的核心,因此封裝前考慮問題比較多:

1. 對多種接口類型的支持;

2. 連接異常時能夠重連;

3. 并發(fā)處理的選擇;

4. 使用方便,容易維護;

當前并未全部實現(xiàn),后期會不斷完善。重點提一下并發(fā)處理的選擇:python的并發(fā)處理機制由于存在GIL的原因,實現(xiàn)起來并不是很理想,綜合考慮多進程、多線程、協(xié)程,在不考慮大并發(fā)性能測試的前提下使用了多線程-線程池的形式實現(xiàn)。使用的是

concurrent.futures模塊。當前僅方便支持webservice接口。

# -*- coding:utf-8 -*- import requestsfrom concurrent.futures import ThreadPoolExecutorfrom Tools.Config import Config # 配置文件讀取from Tools.Log import Log # 日志管理from Tools.tools import decoLOG # 日志裝飾 '''  功能:   Requests類  使用方法:   作者:   郭可昌  作成時間: 20180224  更新內(nèi)容:  更新時間:'''class Requests(object):  def __init__(self):    self.session = requests.session()    self.header = {}    # URL默認來源于配置文件,方便不同測試環(huán)境的切換,也可以動態(tài)設(shè)定    self.URL = Config().getURL()    # 默認60s,可以動態(tài)設(shè)定    self.timeout = 60    #http連接異常的場合,重新連接的次數(shù),默認為3,可以動態(tài)設(shè)定    self.iRetryNum = 3     self.errorMsg = ""    # 內(nèi)容 = {用例編號:響應(yīng)數(shù)據(jù)}    self.responses = {}    # 內(nèi)容 = {用例編號:異常信息}    self.resErr={}    # 原始post使用保留  # bodyData: request's data  @decoLOG  def post(self, bodyData):    response = None    self.errorMsg = ""     try:      response = self.session.post(self.URL, data=bodyData.encode('utf-8'), headers=self.header, timeout=self.timeout)      response.raise_for_status()    except Exception as e:      self.errorMsg = str(e)      Log().logger.error("HTTP請求異常,異常信息:%s" % self.errorMsg)    return response    # 復(fù)數(shù)請求并發(fā)處理,采用線程池的形式,用例數(shù)>線程池的容量:線程池的容量為并發(fā)數(shù),否則,用例數(shù)為并發(fā)數(shù)  # dicDatas: {用例編號:用例數(shù)據(jù)}  @decoLOG  def req_all(self, dicDatas, iThreadNum=5):     if len(dict(dicDatas)) < 1:      Log().logger.error("沒有測試對象,請確認后再嘗試。。。")      return self.responses.clear()     # 請求用例集合轉(zhuǎn)換(用例編號,用例數(shù)據(jù))    seed = [i for i in dicDatas.items()]    self.responses.clear()     # 線程池并發(fā)執(zhí)行,iThreadNum為并發(fā)數(shù)    with ThreadPoolExecutor(iThreadNum) as executor:      executor.map(self.req_single,seed)     # 返回所有請求的響應(yīng)信息({用例編號:響應(yīng)數(shù)據(jù)}),http連接異常:對應(yīng)None    return self.responses   # 用于單用例提交,http連接失敗可以重新連接,最大重新連接數(shù)可以動態(tài)設(shè)定  def req_single(self, listData, reqType="post", iLoop=1):    response = None    # 如果達到最大重連次數(shù),連接后提交結(jié)束    if iLoop == self.iRetryNum:      if reqType == "post":        try:          response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,                       timeout=self.timeout)          response.raise_for_status()        except Exception as e:          # 異常信息保存只在最大連接次數(shù)時進行,未達到最大連接次數(shù),異常信息為空          self.resErr[listData[0]] = str(e)          Log().logger.error("HTTP請求異常,異常信息:%s【%d】" % (str(e), iLoop))         self.responses[listData[0]] = response      else:        # for future: other request method expand        pass    # 未達到最大連接數(shù),如果出現(xiàn)異常,則重新連接嘗試    else:      if reqType == "post":        try:          response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,                       timeout=self.timeout)          response.raise_for_status()        except Exception as e:          Log().logger.error("HTTP請求異常,異常信息:%s【%d】" % (str(e), iLoop))          # 重連次數(shù)遞增          iLoop += 1          # 進行重新連接          self.req_single(listData, reqType, iLoop)          # 當前連接終止          return None        self.responses[listData[0]] = response      else:        # for future: other request method expand        pass   # 設(shè)定SoapAction, 快捷完成webservice接口header設(shè)定  def setSoapAction(self, soapAction):    self.header["SOAPAction"] = soapAction    self.header["Content-Type"] = "text/xml;charset=UTF-8"    self.header["Connection"] = "Keep-Alive"    self.header["User-Agent"] = "InterfaceAutoTest-run"             
發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 灵武市| 金阳县| 大洼县| 景德镇市| 安康市| 密云县| 固镇县| 工布江达县| 达日县| 南川市| 紫阳县| 江达县| 石泉县| 百色市| 温州市| 静海县| 普定县| 当阳市| 高尔夫| 南澳县| 栾城县| 马鞍山市| 黔江区| 西城区| 白山市| 固始县| 临邑县| 随州市| 安福县| 巴中市| 临高县| 沾益县| 都江堰市| 怀化市| 嘉定区| 司法| 大宁县| 新晃| 曲周县| 泰兴市| 德清县|