# coding:utf-8 import os res_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) + '/common' + '/config' + '/res.json' res_global_path = os.path.abspath( os.path.dirname(os.path.dirname(__file__))) + '/common' + '/config' + '/res_global.json' res_alias_json = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) + '/common' + '/config' + '/res_alias.json' import json import copy import time from common.processingJson import write_data, get_json from common.logger import Log from common.config import testDate from lib.public import get_extract, get_param, replace_var, extract_variables, \ get_param_reponse, call_interface, format_url import urllib3 from base.models import Case from lib.public import format_request urllib3.disable_warnings() log = Log() # 初始化log # res_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) + '\\common' + '\\config' + '\\res.json' # extract_dict = {} def send_requests(s, data_dict_D): """封装requests请求""" data_dict = copy.deepcopy(data_dict_D) if not isinstance(data_dict, dict): # 判断传入参数类型 raise TypeError('{} 参数不是字典类型'.format(send_requests.__name__)) else: # 其实要取库里面的值 case_from_sql = Case.objects.get(case_name=data_dict['case_name']) new_dict = [item for item in json.loads(case_from_sql.content) if str(item['if_id']) == str(data_dict['if_id'])][0] # 主要是获取 method = data_dict["method"] # 获取请求方式 url = format_request(data_dict["url"]) # 获取请求地址 data_type = data_dict["data_type"] # 请求参数类型 test_name = data_dict['if_name'] # 测试用例中文名称 headers = format_request(new_dict['header']) params = format_request(new_dict['params']) body = format_request(new_dict['body']) if json.dumps(body).startswith('['): # body参数是list的情况 for k, v in body.items(): body[k] = eval(v) url, body = format_url(url, body) print('<div style="color:green"><strong>url</strong>' + '</div>') print('<div>' + url + '</div>') print('<div style="color:green"><strong>Method</strong></div>') print('<div>' + method.upper() + '</div>') if body: print('<div style="color:green">' + '<strong>Body</strong>' + '</div>') try: print( '<div style="max-height:350px;overflow:scroll"><pre style="white-space: pre-wrap;word-wrap: break-word;">' + json.dumps( body, indent=2, ensure_ascii=False) + '</pre></div>') except: print(str(body)) # end extract = new_dict['extract'] # checkpoint = data_dict["checkpoint"] modify --liangfenglong 换成下一行 if_id = data_dict['if_id'] checkpoint = new_dict['validators'] res = {} # 接受返回数据 try: # 构造请求 r = call_interface(s, method, url, headers, params, body, data_type) print('<div style="color:green">' + '<strong>Response</strong>' + '</div>') try: print( '<div style="max-height:350px;overflow:scroll"><pre style="white-space: pre-wrap;word-wrap: break-word;">' + json.dumps( r.json(), indent=2, ensure_ascii=False) + '</pre></div>') except: print(str(r)) # end log.info("页面返回信息:%s" % r.content.decode("utf-8")) # 信息存储到res字典中 res['if_id'] = if_id res['test_name'] = test_name res["status_code"] = str(r.status_code) # 状态码转成str res["text"] = r.content.decode("utf-8") if extract: extract_dict = {} for item in extract: extract_dict[item] = get_param_reponse(extract[item], r.json()) write_data(extract_dict, res_path) for item in checkpoint: item['check'] = get_param_reponse(item['check'], r.json()) res['checkpoint'] = checkpoint return res except Exception as msg: log.error('请求出现异常! {}'.format(msg)) res["msg"] = str(msg) # 出现异常,保存错误信息 res["result"] = "error" # 结果保存错误 print("这里是msg%s" % res) return res def replace_params(params, data_dict, params_type=0): """ 上下文依赖 :param params: :param data_dict: :param params_type: :return: """ var_list = extract_variables(params) if var_list: for var_name in var_list: '''modify by -liangfenglong''' # 查找顺序res_global_path,res_path,res_alias_json,然后在上一次返回值取值 # 2019-07-24 没必要看res_alias_json,在get_extract中用即可 var_value, alias_flag = get_json((res_global_path, res_path), var_name) data_dict = json.loads(replace_var(data_dict, var_name, var_value)) log.info(data_dict) if params_type == 0: log.info('headers 替换依赖参数: {}'.format(data_dict['header'])) return data_dict['header'] elif params_type == 1: body = {} if data_dict: body = data_dict.get('body', {}) log.info('body 替换上下文依赖参数: {}'.format(body)) return body else: log.info('{} 替换依赖参数: {}'.format(params_type, data_dict[params_type])) return data_dict[params_type] else: return params if __name__ == "__main__": pass