Commit adca9c14 authored by zhanglu's avatar zhanglu Committed by JackieZhang

接口检测工具

parents
gm_apidesc.egg-info/*
dist/*
build/*
*.py[co]
\ No newline at end of file
## 针对请求是的参数检测
* 1、针对输入进行检测
* 2、针对输出进行检测
使用方法:
```python
from ... import APIBaseView
from gm_apidesc.types import SenceType
from gm_apidesc import schema
from gm_apidesc import types
class API(APIBaseView):
@schema({
"type": SenceType.DJANGO, # 使用场景,没有默认普通函数
'param': { # 入参检测,必填
"id": types.Int, # 可以仅仅是某个类型
"user_id": { # 字典,则针对这个参数做比较多的扩展
"type": types.BigInt,
"null": True, # null 默认为 False,表示可以为空
"required": False, # 默认必传
}
},
'return': {}, # TODO: 返回值检测,可以不填写
'desc': "sadhfjklhasd" # TODO:函数说明,后续用户文档生成,可以不写
})
def get(self, request):
pass
```
\ No newline at end of file
from gm_apidesc.schema import schema
from gm_apidesc.types import SenceType
from gm_apidesc.types import G_IntegerType
@schema({
# "type": SenceType.DJANGO, # 使用场景,没有默认普通函数
'param': { # 入参检测,必填
"id": G_IntegerType, # 可以仅仅是某个类型
"user_id": { # 字典,则针对这个参数做比较多的扩展
"type": G_IntegerType,
"null": True, # null 默认为 False,表示可以为空
"required": False, # 默认必传
}
},
'return': {}, # TODO: 返回值检测,可以不填写
'desc': "sadhfjklhasd" # TODO:函数说明,后续用户文档生成,可以不写
})
def get(id, user_id):
pass
get(1, "12")
from .check import *
from .types import *
__version__ = '0.0.1'
"""文档生成。"""
#
# TODO
#
\ No newline at end of file
from .check import *
from .metatype import MetaObject, _BaseMeta
def django_input_check(checker, args, kwargs):
args_dict, req_params = combine_args(args, kwargs, checker)
# 参数名和其他参数名不能冲突
intersection = set(req_params.keys()) & set(args_dict.keys())
if intersection:
raise runexc.ValidateError(
"req.params names : %s conflict with others" % list(
intersection))
for param_name, param_def in checker.param_defs.items():
# req.param参数需要进行转换
if param_name in req_params:
validate_with_exception_info(param_def, req_params, param_name)
else:
validate_with_exception_info(param_def, args_dict, param_name)
args_dict.update(req_params)
# 检查所入参是否检测到
for arg_name in args_dict:
if arg_name not in checker.param_defs:
raise runexc.VisibleExUnknownArgument(arg_name)
\ No newline at end of file
"""参数检测。"""
import copy
from simplegeneric import generic
from gm_apidesc.types import SenceType
from gm_apidesc.types import UserType
from gm_apidesc.types import PASS
from gm_apidesc import runexc
from .metatype import _BaseMeta
from ._sence_check import django_input_check
############################## 相关的schema定义检测 ##############################
def register_type(checker, sence_type):
"""场景类型检测"""
if not sence_type:
sence_type = SenceType.COMMON
if sence_type not in SenceType:
raise runexc.NotMatchError(sence_type, SenceType, "场景类型错误")
checker.type = sence_type
def register_param(checker, params):
"""参数定义检测"""
if not isinstance(params, dict):
raise runexc.SchemaFormatError("schema.param should be dict")
for param_name, values in params.items():
if not isinstance(values, (dict, UserType)):
raise runexc.SchemaFormatError("schema.param format error")
if isinstance(values, UserType):
checker.add_type_check(param_name, param_type=values)
elif isinstance(values, dict):
param_type = values.get("type")
if not param_type:
raise runexc.SchemaFormatError(
"schema.param: {param_name} missing 'type'".format(param_name=param_name))
values.pop("type")
checker.add_type_check(param_name, param_type=param_type, **values)
def register_return(checker, returns):
"""返回定义检测"""
pass
def register_desc(checker, desc):
"""描述检测"""
pass
############################## 相关运行时检测 ##############################
def validate_input(checker, args, kwargs):
"""检测入参。
:param checker:参数检测对象
:param args:位置参数值
:param kwargs:键值对参数值
"""
args_dict, req_params = combine_args(checker, args, kwargs)
# 参数名和其他参数名不能冲突
intersection = set(req_params.keys()) & set(args_dict.keys())
if intersection:
raise runexc.ValidateError(
"req.params names : %s conflict with others" % list(
intersection))
for param_name, param_def in checker.param_defs.items():
# request.POST/GET 参数需要进行转换
if param_name in req_params:
validate_with_exception_info(param_def, req_params, param_name)
else:
validate_with_exception_info(param_def, args_dict, param_name)
args_dict.update(req_params)
# 检查所入参是否检测到
for arg_name in args_dict:
if arg_name not in checker.param_defs:
raise runexc.VisibleExUnknownArgument(arg_name)
def validate_output(checker, return_type, result):
"""出参检测"""
pass
def combine_args(checker, args, kwargs):
"""将函数参数名称与参数值进行组合。
:param args: 位置参数值
:param kwargs: 键值对参数值
:return:
"""
args, req_param = checker.get_req_args(args, kwargs)
kwargs = copy.deepcopy(kwargs)
args_value = args
param_names = checker.param_names
# combine args and kwargs and req.params
args = dict(zip(param_names, args_value))
if kwargs:
args.update(kwargs)
return args, req_param
def validate_with_exception_info(param_def, dict_param, param_name):
"""
参数检测入口,提供异常信息的处理
:param param_def: 参数定义
:param dict_param: 参数所在的字典结构
:param param_name:参数名
:return:
"""
try:
check_param(param_def, dict_param, param_name)
except (runexc.VisibleExInvalidInput,
runexc.VisibleExMissingArgument,
runexc.VisibleExUnknownArgument):
raise
except Exception as e:
param_value = dict_param[param_name]
raise runexc.VisibleExInvalidInput(field=param_name,
value=param_value,
expect_cls=None,
error=e)
def check_param(typedef, value, key=None):
"""检测参数值。
:param typedef: 参数类型
:param value: 参数值所在的json
:param key: 参数对应的key
:return:
"""
if key:
check_typedef(typedef, value, key)
else:
datatype = typedef.type
verification(datatype, value)
def check_required(typedef, value, key):
if key not in value:
if typedef.required:
raise runexc.VisibleExMissingArgument(argname=key, value=value)
else:
return PASS
def check_nullable(typedef, value, key):
"""nullable包含None和空字符串。"""
if typedef.nullable:
if value[key] in ('', None):
return PASS
if not typedef.nullable:
if value[key] in ('', None):
raise runexc.VisibleExInvalidInput(field=key,
value=value,
expect_cls=typedef,
error='%s不能为null' % key)
return
CHECK_ITEM = [check_required, check_nullable]
def check_typedef(typedef, value, key):
"""
对参数定义项进行检测, 如nullable required
:param typedef:
:param value:
:param key:
:return:
"""
for item in CHECK_ITEM:
if item(typedef, value, key) == PASS:
return
from_param(typedef.type, value, key)
# todo 用户自定义类和字典类统一提供validate接口方法
def verification(datatype, value):
if isinstance(datatype, _BaseMeta):
check_complex_type(datatype, value)
else:
datatype.verification(value)
@generic
def from_param(datatype, value, key):
raise runexc.VisibleExInvalidInput(field=key,
value=value,
expect_cls=datatype,
error='参数错误')
@from_param.when_type(_BaseMeta)
def from_baseMeta(datatype, value, key):
check_complex_type(datatype, value[key])
@from_param.when_type(UserType)
def from_UserType(datatype, value, key):
datatype.type_check(value, key)
def check_complex_type(datatype_def, jdata):
"""检测json类型。"""
if not isinstance(jdata, dict):
raise runexc.ValidateError('参数格式错误')
checker = getattr(datatype_def, "CHECKER_TAG")
for attr_name in checker.attr_names:
attr_def = checker.get_elem_def(attr_name)
try:
check_param(attr_def, jdata, attr_name)
except (runexc.VisibleExInvalidInput,
runexc.VisibleExMissingArgument,
runexc.VisibleExUnknownArgument):
raise
except Exception as e:
if e.__class__.__name__.startswith('VisibleEx'):
raise runexc.VisibleExInvalidInput(field=attr_name,
value=jdata,
expect_cls=attr_def.type,
error=e)
else:
raise runexc.VisibleExInvalidInput(field=attr_name,
value=jdata,
expect_cls=attr_def.type,
error='参数错误')
extra_value = []
for key in jdata:
if key not in checker.attr_names:
extra_value.append(key)
if extra_value:
raise runexc.VisibleExUnknownArgument(argname=extra_value)
return jdata
import six
class MetaObject(object):
"""属性元类,动态设置属性,初始化只能输入键值对。"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
class _BaseMeta(type):
def __init__(cls, name, bases, dct):
if bases and bases[0] is MetaType:
checker = unregister(bases[0])
setattr(cls, "CHECKER_TAG", checker)
setattr(cls, '_return_type', False)
if checker:
checker.name = name
class MetaType(six.with_metaclass(_BaseMeta)):
"""
暂存子类的属性
"""
def __init__(self):
pass
@classmethod
def set_return_type(cls, value):
setattr(cls, '_return_type', value)
@classmethod
def get_return_type(cls):
return getattr(cls, '_return_type')
@classmethod
def tostring(cls):
return cls.__name__
import logging
LOG = logging.getLogger("gm_apidesc")
def _format_with_kwargs(msg_format, kwargs):
return msg_format % kwargs
class Error(Exception):
message_format = None
def __init__(self, message=None, **kwargs):
try:
message = self._build_message(message, **kwargs)
except KeyError:
message = self.message_format
super(Error, self).__init__(message)
def _build_message(self, message, **kwargs):
if message:
return message
return _format_with_kwargs(self.message_format, kwargs)
class ValidateError(Error):
def __init__(self, message=None):
super(ValidateError, self).__init__(message)
class VisibleExInvalidInput(ValidateError):
"""参数格式不对"""
message_format = 'Error: In data: %(input)s, invalid %(field)s: %(expect)s %(error)s'
def __init__(self, field, value, expect_cls=None, error=''):
expect = expect_cls.tostring() \
if expect_cls and hasattr(expect_cls, 'tostring') else ''
self.dbgmsg = self.message_format % {'field': field,
'input': value,
'expect': expect,
'error': error}
LOG.error(self.dbgmsg)
super(VisibleExInvalidInput, self).__init__(
'%(field)s输入错误:%(error)s' % {'field': field, 'error': error})
class VisibleExMissingArgument(ValidateError):
"""参数缺失"""
message_format = 'Error: in %(input)s, missing argument %(name)s'
def __init__(self, argname, value):
self.dbgmsg = self.message_format % {'name': argname, 'input': value}
LOG.error(self.dbgmsg)
super(VisibleExMissingArgument, self).__init__('缺少参数:%s' % argname)
class VisibleExUnknownArgument(ValidateError):
"""未知参数、多余参数"""
message_format = 'Error: Unknown argument: %(name)s'
def __init__(self, argname):
self.dbgmsg = self.message_format % {'name': argname}
LOG.error(self.dbgmsg)
super(VisibleExUnknownArgument, self).__init__('不接受参数:%s' % argname)
class NotMatchError(Error):
"""不匹配"""
message_format = 'Error: Type data: %(input)s, invalid %(field)s: %(expect)s %(error)s'
def __init__(self, _type, expect=None, error=''):
super(VisibleExInvalidInput, self).__init__(
'%(_type)s 匹配错误(%(expect)s):%(error)s'
% {'_type': _type, 'error': error, 'expect': expect}
)
class VisibleExValueError(ValidateError):
pass
"""对入口进行装饰。
使用场景:
1、django http 请求
2、gm_rpcd
3、普通函数
"""
import functools
import inspect
from django.views.generic import View
from django.http import HttpRequest
from gm_apidesc import runexc
from gm_apidesc import check
from gm_apidesc.check import MetaObject
from gm_apidesc.types import filter_inner_type
from gm_apidesc.types import Mapper
# schema 的几个参数,以及对应的检测工具
SCHEMA_CHENER = {
"type": check.register_type,
'param': check.register_param,
'return': check.register_return,
'desc': check.register_desc,
}
class Checker(object):
def __init__(self, func):
# FullArgSpec(args=['a', 'b', 'c'], varargs=None, varkw=None,
# defaults=(None,), kwonlyargs=[], kwonlydefaults=None, annotations={})
func_args = inspect.getfullargspec(func)
self.kwargsname = None
self._param_names = func_args.args
self.param_defs = {}
self.name = func.__name__
self.desc = None
self.return_type = None
self.return_desc = None
self.params_have_default = []
self.req = None
# 获取默认参数
default_len = len(func_args.defaults) if func_args.defaults else 0
self.params_have_default = func_args.args[-default_len]
def add_type_check(self, param_name, param_type, param_desc='', required=True,
nullable=False):
if param_name in self.params_have_default:
required = False
param_type = filter_inner_type(param_type)
if isinstance(param_type, Mapper):
param_type.set_valuename(param_name)
self.param_defs[param_name] = MetaObject(type=param_type,
desc=param_desc,
required=required,
nullable=nullable)
def add_desc(self, desc):
self.desc = desc
def add_ret(self, r_type, desc):
r_type = filter_inner_type(r_type)
self.return_type = MetaObject(type=r_type, desc=desc)
self.return_desc = desc
def get_elem_def(self, param_name):
if param_name in self.param_defs:
return self.param_defs[param_name]
def get_req_args(self, args, kwargs):
"""判断参数中是否有req,并获取req.GET/POST"""
http_args = {}
if not args:
return [], {}
if isinstance(args[0], View): # django class view
args = args[1:]
if isinstance(args[0], HttpRequest): # django class view
request = args[0]
args = args[1:]
http_args.update(request.GET.dict())
http_args.update(request.POST.dict())
return args, http_args
@property
def param_names(self):
return self._param_names
def schema(pattern):
api_schema_pattern = pattern
def decorator(func):
checker = Checker(func)
if not isinstance(api_schema_pattern, dict):
raise runexc.SchemaFormatError("schema必须是字典")
for key, value in api_schema_pattern.items():
if key not in SCHEMA_CHENER:
raise runexc.SchemaFormatError()
SCHEMA_CHENER[key](checker, value)
@functools.wraps(func)
def wrapper(*args, **kwargs):
check.validate_input(checker, args, kwargs)
result = func(*args, **kwargs)
# validate_output(return_type, result)
return result
return wrapper
return decorator
"""存放类型。
为了方便管理,所有的类型都必须使用自定的类型,一方面为了后续扩展,另外一方面后续会在这个类型上面
做一些操作:文档生成等等
"""
from .types import *
from .sence_type import SenceType
\ No newline at end of file
This diff is collapsed.
from ._enum import Enum
class SenceType(Enum):
DJANGO = (1, "Django")
RPC = (1, "gm rpcd")
COMMON = (1, "普通函数")
\ No newline at end of file
from datetime import datetime
import string
import abc
from gm_apidesc import runexc
PASS = True
def filter_inner_type(datatype):
"""将内置类型转换为自定义类型。"""
if datatype in (int, ):
return G_IntegerType
if datatype == str:
return G_StringType
if datatype == float:
return G_FloatType
if datatype == datetime:
return Datetime
if isinstance(datatype, list):
return ListType(datatype[0])
if isinstance(datatype, UserType):
return datatype
raise runexc.ValidateError("type %s do not allowed" % datatype)
class UserType(object):
name = None
whitespace = tuple(string.whitespace)
def type_check(self, jdata, key):
self.verification(jdata[key])
def verification(self, value):
# 如果字符串前后包含空字符报错
if type(value) in (str, ):
if value.endswith(self.whitespace) or value.startswith(
self.whitespace):
raise runexc.ValidateError(
"String Type do not allowed start or end with space")
self.validate(value)
@abc.abstractmethod
def validate(self, value):
pass
def sample(self):
# 示例
pass
def tostring(self):
pass
def set_return_type(self, value):
pass
class IntegerType(UserType):
name = "Integer"
allows_types = (int, str)
def __init__(self, minimum=None, maximum=None):
self.minimum = minimum
self.maximum = maximum
def validate(self, value):
if not isinstance(value, self.allows_types) or \
(isinstance(value, str) and not value.isdigit()):
raise runexc.VisibleExValueError('值必须是数字')
value = int(value)
if self.minimum is not None and value < self.minimum:
error = '值必须大于等于%s' % self.minimum
raise runexc.VisibleExValueError(error)
if self.maximum is not None and value > self.maximum:
error = '值必须小于等于%s' % self.maximum
raise runexc.VisibleExValueError(error)
class FloatType(IntegerType):
name = "Float"
allows_types = (int, float, str)
def validate(self, value):
if not isinstance(value, self.allows_types):
raise runexc.VisibleExValueError('值必须是数字')
if isinstance(value, str):
try:
value = float(value)
except:
raise runexc.VisibleExValueError('值必须是数字')
if self.minimum is not None and value < self.minimum:
error = '值必须大于等于%s' % self.minimum
raise runexc.VisibleExValueError(error)
if self.maximum is not None and value > self.maximum:
error = '值必须小于等于%s' % self.maximum
raise runexc.VisibleExValueError(error)
class _StringType(UserType):
name = "String"
def __init__(self, minlen=None, maxlen=None):
self.minlen = minlen
self.maxlen = maxlen
def validate(self, value):
if not isinstance(value, (str,)):
raise runexc.VisibleExValueError('值必须是字符串')
str_len = len(value)
if self.minlen is not None and str_len < self.minlen:
error = '字符串长度必须大于等于%s' % self.minlen
raise runexc.VisibleExValueError(error)
if self.maxlen is not None and str_len > self.maxlen:
error = '字符串长度必须小于等于%s' % self.maxlen
raise runexc.VisibleExValueError(error)
G_StringType = _StringType()
G_IntegerType = IntegerType()
G_FloatType = FloatType()
class _MapperDict(dict):
def __str__(self):
# 为dict定制打印格式
format_dict = dict([(k, v.tostring()) for k, v in self.items()])
return 'Mapper(%s)' % format_dict
class Mapper(UserType):
name = 'Mapper'
def __init__(self, keyname, mapper):
for key, value in mapper.items():
mapper[key] = filter_inner_type(value)
self.mapper = _MapperDict(mapper)
self.keyname = keyname
self.valuename = None
def set_valuename(self, valuename):
self.valuename = valuename
def type_check(self, jdata, key):
self.validate(jdata)
def sample(self):
keys = self.mapper.keys()
return self.mapper[keys[0]]
def tostring(self):
return self.mapper
def validate(self, value):
# 判断是映射类型
from gm_apidesc.check import verification
key = value[self.keyname]
if key in self.mapper:
verification(self.mapper[key], value[self.valuename])
else:
raise runexc.VisibleExMissingArgument(argname=key, value=value)
class ListType(UserType):
name = 'List'
def __init__(self, elemtype, minlen=None, maxlen=None):
self.elemtype = filter_inner_type(elemtype)
self.maxlen = maxlen
self.minlen = minlen
def validate(self, value):
if not isinstance(value, list):
raise runexc.VisibleExValueError('值必须是列表')
if self.minlen is not None and len(value) < self.minlen:
error = '元素个数必须大于%s' % self.minlen
raise runexc.VisibleExValueError(error)
if self.maxlen is not None and len(value) > self.maxlen:
error = '元素个数必须小于%s' % self.maxlen
raise runexc.VisibleExValueError(error)
for elem in value:
from sf_apidesc.paramchecker.checkparam import verification
verification(self.elemtype, elem)
def sample(self):
return self.elemtype
def tostring(self):
return "[%s]" % self.elemtype.tostring()
from setuptools import find_packages
from setuptools import setup
import os
import io
import re
import ast
# requirements
install_requires = [
'simplegeneric',
'django==1.8'
]
here = os.path.abspath(os.path.dirname(__file__))
with io.open(os.path.join(here, 'gm_apidesc', '__init__.py')) as f:
version = None
for line in f.readlines():
match = re.match(r"""__version__ *= *(.*)""", line)
if not match:
continue
content = match.groups()[0]
version = ast.literal_eval(content)
setup(
name="gm-apidesc",
version=version,
description="igengmei apidesc",
author="JackieZhang",
author_email="zhanglu@igengmei.com",
packages=find_packages(),
url="http://git.wanmeizhensuo.com/zhanglu/gm-apidesc",
install_requires=install_requires
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment