-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
- Loading branch information
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
congig:配置文档 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
测试用例设计 | ||
""" | ||
import json | ||
import os | ||
from InterfaceBase.base import RequestApi | ||
from Public.assertion import assert_in | ||
from Public.get_excel import dataParsing | ||
from Public.log import logger, LOG | ||
from config.config import * | ||
|
||
# filepath = os.path.dirname(os.getcwd())+'\\data\\case.xlsx' | ||
filepath = os.getcwd()+'\\data\\case.xlsx' | ||
|
||
list_id, list_name, list_method, list_url, list_key,list_params, list_anticipate = dataParsing(filepath) | ||
# print(list_id, listname, list_method, list_url, list_key,list_params, list_anticipate) | ||
|
||
@logger("用例测试") | ||
def caseInterface(): | ||
list_pass = 0 #成功用例数量 | ||
list_fail = 0 #失败用例数量 | ||
list_return_json = [] #返回json列表 | ||
list_result_status = [] #返回状态 | ||
list_unknown = 0 #未知错误数量 | ||
list_abnormal = 0 #异常用例(abnormal:异常) | ||
error_num = 0 | ||
for listurl,listparams,listmethod,listanticipate in zip(list_url,list_params,list_method,list_anticipate): | ||
while error_num <= Config_Try_Num: | ||
method = str(listmethod) #将请求方法转换为字符串,必须 | ||
params = json.loads(listparams) #将请求参数转换为json格式,必须 | ||
# 打印接口请求所有参数格式,url为字符串,参数为字典json格式,方法为字符串 | ||
# print((type(TestPlanUrl + listurl)), type(params), type(method)) | ||
#调用InterfaceBase.base.py文件中的RequestApi类,开始测试 | ||
api = RequestApi(url = TestPlanUrl+listurl,params = params, method=method ) | ||
api_json=api.getJson() #获取接口返回的json数据 | ||
assert_re = assert_in(anticipate=listanticipate, return_json=api_json) | ||
print(assert_re) | ||
if api_json['code'] == 0: | ||
LOG.info('case:code=0> 参数:%s, url:%s ,返回:%s,预期:%s' % (list_params, list_url, api_json, list_anticipate)) | ||
# print("预期",list_anticipate) | ||
assert_re = assert_in(anticipate=listanticipate,return_json=api_json) | ||
if assert_re['code'] == 0: | ||
list_return_json.append(api_json['return_result_json']) | ||
list_result_status.append('pass') | ||
list_pass += 1 | ||
error_num = 0 | ||
break | ||
elif assert_re['code'] == 1: | ||
if error_num <= Config_Try_Num: | ||
error_num += 1 | ||
LOG.info("code=1 失败重试中") | ||
else: | ||
LOG.info("code=1 失败重试次数用完,最后结果") | ||
error_num = 0 | ||
list_fail += 1 | ||
list_result_status.append('fail') | ||
list_return_json.append(api_json['return_result_json']) | ||
break | ||
elif assert_re['code'] == 2: | ||
if error_num < Config_Try_Num: | ||
error_num += 1 | ||
LOG.info('code=2 失败重试中') | ||
else: | ||
LOG.info("code=2 失败重试次数用完,最后结果") | ||
error_num = 0 | ||
list_abnormal += 1 | ||
list_result_status.append(assert_re['return_result_json']) | ||
break | ||
else: | ||
if error_num < Config_Try_Num: | ||
error_num += 1 | ||
LOG.info('无code,失败重试中') | ||
else: | ||
LOG.info('无code,失败重试次数用完,最后结果') | ||
error_num = 0 | ||
list_unknown += 1 | ||
list_result_status.append('无code,未知错误') | ||
list_return_json.append('无code,未知错误') | ||
break | ||
else: | ||
if error_num <Config_Try_Num: | ||
error_num += 1 | ||
LOG.info('最终失败重试中') | ||
else: | ||
LOG.info('最终失败,失败重试中次数用完,最后结果') | ||
error_num = 0 | ||
list_abnormal += 1 | ||
list_result_status.append('exception') | ||
list_return_json.append(api_json['result']) | ||
break | ||
return list_result_status,list_fail,list_pass,list_return_json,list_abnormal,list_unknown | ||
|
||
if __name__ == "__main__": | ||
A = caseInterface() | ||
print(A) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
""" | ||
@file: 钉钉发送消息封装 | ||
""" | ||
'''封装钉钉群发消息''' | ||
import requests,json | ||
from config.config import Dingtalk_access_token | ||
|
||
def send_ding(content): | ||
url = Dingtalk_access_token | ||
pagrem = { | ||
"msgtype": "text", | ||
"text": { | ||
"content": content | ||
}, | ||
"isAtAll": True | ||
} | ||
headers = { | ||
'Content-Type': 'application/json' | ||
} | ||
f = requests.post(url, data=json.dumps(pagrem), headers=headers) | ||
if f.status_code==200: | ||
return True | ||
else: | ||
return False |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
request第二次封装,引用“from InterfaceBase.requests_base import request” | ||
""" | ||
|
||
from InterfaceBase.requests_base import request | ||
request = request() | ||
|
||
class RequestApi(object): | ||
|
||
def __init__(self, url, params, method): | ||
self.url = url #接口请求url | ||
# self.key = key #接口请求key值 | ||
self.param = params #接口连接方式 | ||
self.rehquest_metod = method #接口请求方式,post、get、put、delete | ||
|
||
def testapi(self): | ||
if self.rehquest_metod == "POST": | ||
# print("POST") | ||
self.response = request.post(self.url, self.param) | ||
elif self.rehquest_metod == "GET": | ||
self.response = request.get(url=self.url, params=self.param) | ||
elif self.rehquest_metod == "DELETE": | ||
self.response = request.delete(url=self.url, params=self.param) | ||
return self.response | ||
|
||
def getJson(self): | ||
json_data = self.testapi() | ||
return json_data | ||
|
||
if __name__ == "__main__": | ||
|
||
api = RequestApi(url="http://XXX/Account/Login", params={"hnAccount":"1158984565456","hnPassword":"123456"}, method="POST") | ||
# api.testapi() | ||
api_json = api.getJson() | ||
print(api_json) | ||
print("第二次请求接口") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
# -*- coding:utf8 -*- | ||
""" | ||
requests第一次封装 | ||
整个测试框架的基础 | ||
""" | ||
import requests,json | ||
from Public.log import LOG,logger | ||
|
||
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:51.0) Gecko/20100101 Firefox/51.0"} | ||
|
||
@logger("requests封装文件:request_base.py") | ||
class request(object): | ||
def __init__(self): | ||
# 自动加载浏览器请求头 | ||
self.headers = headers | ||
|
||
#get请求方法,已调试成功 | ||
def get(self,url,params): | ||
""" | ||
:param url: 完整的请求url | ||
:param params: 完整的请求参数 | ||
:return:返回值 “0” 代表接口可以连接,“1”代表无效接口连接 | ||
""" | ||
#将传入的接口参数,转换为json格式,可能无需转换,先调试 | ||
params_json = json.dumps(params) | ||
try: | ||
''' | ||
也可以使用这种方式请求 | ||
response = requests.request("GET", url, headers=self.headers, params=params) | ||
''' | ||
LOG.info("GET开始请求接口") | ||
response = requests.get(url, headers=self.headers, params=params_json) #post方法请求接口 | ||
LOG.info("GET接口请求成功,状态:%s" % response) | ||
response.encoding = "UTF-8" #此处需要设置GET返回信息的字符集 | ||
reponse_json = json.loads(response.text) | ||
LOG.info("接口返回json信息:%s"%reponse_json) | ||
return {'code':0,'return_result_json':reponse_json} | ||
except Exception as error: | ||
LOG.info('GET请求出错(位置requests的模块第一次封装,请求接口有误),出错原因:%s'%error) | ||
return {'code':1,'return_result_json':'post请出错(位置requests的模块第一次封装,请求接口有误),出错原因:%s'%error} | ||
|
||
#post请求方法,已调试成功 | ||
def post(self,url,params): | ||
""" | ||
:param url: 完整的请求url | ||
:param params: 完整的请求参数 | ||
:return:返回值 “0” 代表接口可以连接,“1”代表无效接口连接 | ||
""" | ||
try: | ||
''' | ||
也可以使用这种方式请求 | ||
response = requests.request("POST", url, headers=self.headers, params=params) | ||
''' | ||
LOG.info("POST开始请求接口") | ||
response = requests.post(url, headers=self.headers, params=params) #post方法请求接口 | ||
LOG.info("POST接口请求成功,状态:%s" % response) | ||
reponse_json = json.loads(response.text) | ||
LOG.info("接口返回json信息:%s"%reponse_json) | ||
return {'code':0,'return_result_json':reponse_json} | ||
except Exception as error: | ||
LOG.info('POST请求出错(位置requests的模块第一次封装,请求接口有误),出错原因:%s'%error) | ||
return {'code':1,'return_result_json':'post请出错(位置requests的模块第一次封装,请求接口有误),出错原因:%s'%error} | ||
|
||
# delete请求方法,已调试成功 | ||
def delete(self, url, params): | ||
""" | ||
:param url: 完整的请求url | ||
:param params: 完整的请求参数 | ||
:return:返回值 “0” 代表接口可以连接,“1”代表无效接口连接 | ||
""" | ||
# 将传入的接口参数,转换为json格式,可能无需转换,先调试 | ||
params_json = json.dumps(params) | ||
try: | ||
''' | ||
也可以使用这种方式请求 | ||
response = requests.request("DELETE", url, headers=self.headers, params=params) | ||
''' | ||
LOG.info("DELETE开始请求接口") | ||
response = requests.delete(url, headers=self.headers, params=params_json) # post方法请求接口 | ||
LOG.info("DELETE接口请求成功,状态:%s" % response) | ||
reponse_json = json.loads(response.text) | ||
LOG.info("接口返回json信息:%s" % reponse_json) | ||
return {'code': 0, 'return_result_json': reponse_json} | ||
except Exception as error: | ||
LOG.info('DELETE请求出错(位置requests的模块第一次封装,请求接口有误),出错原因:%s' % error) | ||
return {'code': 1, 'return_result_json': 'post请出错(位置requests的模块第一次封装,请求接口有误),出错原因:%s' % error} | ||
|
||
|
||
|
||
#未知原因:使用程序入口,无发打印信息 | ||
if __name__ == "__main__": #不使用入口 | ||
#post请求调试 | ||
url = "http:/Account/Login" | ||
Interface_parameters = {"hnAccount": "16985652453", "hnPassword": "123456"} | ||
r = request() | ||
a = r.post(url,Interface_parameters) | ||
print("requests第一次封装") | ||
|