Commit a92940e9 authored by liangfenglong's avatar liangfenglong

初始框架上传

parent 597e9e8a
# something here
LIVE_USER = {
'phone': '18633668899',
'password': '123456'
}
HERA_SESSION = 'tm115sjlt2nzimgxfa34c91xghvor1ue'
# 公共参数
GENGMEI_PARAMS = {
'app_name': 'com.wanmeizhensuo.zhensuo',
'version': '7.38.0',
'platform': 'android',
'device_id': '869574031646601', # 这个地方根据需要 可能需要修改
'os_version': '8.0.0', # 不影响逻辑
'model': 'BND-AL10', # 不影响逻辑
'screen': '1080x2038', # 不影响逻辑
'lat': '40.002237', # 不影响逻辑
'lng': '116.487012', # 不影响逻辑
'channel': 'benzhan', # 不影响逻辑
'current_city_id': 'beijing',
'manufacturer': 'HUAWEI', # 不影响逻辑
'uuid': '19a56bb4-4ff7-46d4-b156-904297b91385' # 不影响逻辑
}
BACKEND_ADMIN = 'https://backend.igengmei.com'
import pytest
# 入口,直接使用命令行不会跑到该入口
if __name__ == '__main__':
pytest.main()
import unittest
import requests
from utils.gmhttp import require_login
class LiveCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
# 做一些前置条件
'''
1. 创建直播
2. hera上为直播做配置
:return:
'''
cls.create_live()
@classmethod
@require_login
def create_live(cls):
'''
创建直播间
:return:
'''
r = requests.post('xxx').json()
cls.live_id = r['']
@classmethod
@require_login
def close_live(cls):
'''
关闭直播间
:return:
'''
r = requests.post('xxx').json()
print('关闭直播间成功!')
\ No newline at end of file
import unittest
import requests
from utils.gmhttp import require_login
class LiveXXX(unittest.TestCase):
@require_login
def test_xxx(self):
'''
to be continued
:return:
'''
# 直播id self.live_id
import requests
from requests.cookies import cookiejar_from_dict
from conf.settings import HERA_SESSION
hera_session = requests.Session()
hera_session.cookies = cookiejar_from_dict({"session_key": HERA_SESSION})
# 封装一次 session
import requests
import sys
from functools import wraps
from copy import deepcopy
from conf import settings
# 使用from import * 仅允许require_login
__all__ = ['require_login']
class LoginFailedException(Exception):
'''
登录失败
'''
pass
# 支持更换用户
def require_login(arg=None, **user_dict):
'''
依然使用requests,只不过在使用过程中被替换成了gmhttp
:param func:
:return:
'''
def outer(func):
@wraps(func)
def _inner(*args, **kwargs):
# 哈哈,不知道写啥
try:
newhttp = _Gmhttp.instance(**user_dict)
except LoginFailedException:
# 登录失败后,使用无登录状态尝试访问
return func(*args, **kwargs)
# 为了减少修改原有代码结构,仅需要对测试case添加装饰器require_login
func_module = sys.modules.get(func.__module__)
source_requests = getattr(func_module, 'requests')
setattr(func_module, 'requests', newhttp)
require_login.uid = newhttp.uid
try:
return func(*args, **kwargs)
except Exception:
pass
finally:
setattr(func_module, 'requests', source_requests)
return _inner
# 为了兼容@require_login 直接使用
if callable(arg):
return outer(arg)
return outer
# 内部使用
class _Gmhttp(requests.Session):
'''
封装一个类,提供request的各种方法
'''
# _object = None
_params = settings.GENGMEI_PARAMS
_host = settings.BACKEND_ADMIN # 这种地方还是需要优化,看配置在何处比较方便工程使用
_userinfo = settings.LIVE_USER # 这种地方还是需要优化,看配置在何处比较方便工程使用
_exist_user_objs = {}
# 不同手机号 相同实例,这个方法不好,直接使用instance生成实例
# def __new__(cls, *args, **kwargs):
# '''
# 做一个单例,最好直接使用gmhttp,天然单例
# :param args:
# :param kwargs:
# :return:
# '''
# if cls._object == None:
# cls._object = super().__new__(cls, *args, **kwargs)
# return cls._object
def __init__(self, phone, password, *args, **kwargs):
'''
传一些默认值
:param args:
:param kwargs:
'''
super().__init__(*args, **kwargs)
self.phone = phone
self.password = password
self.params = self._params
self.verify = False # 避开证书
def reset(self):
# 用于灰度params的设置
self.params = deepcopy(self._params)
@classmethod
def instance(cls, userinfo):
# 登录,如果phone和password有任意None 则使用默认账户
phone = userinfo.get('phone', _Gmhttp._userinfo['phone'])
password = userinfo.get('password', _Gmhttp._userinfo['password'])
# 先获取是否已经实例化,若有直接返回实例
obj = _Gmhttp._exist_user_objs.get(phone, None)
if obj == None:
# 防止多次条用登录接口
obj = _Gmhttp(phone, password)
obj.login()
_Gmhttp._exist_user_objs[phone] = obj
return obj
def login(self):
# 没有实例,进行实例化
data = {"phone": self.phone, 'password': self.password}
rep = self.post(
url=_Gmhttp._host + '/api/accounts/login/password',
data=data
)
try:
res = rep.json()
except:
pass
else:
if res['error'] != 0:
raise LoginFailedException('登录失败', res['message'])
self.uid = res['data']['uid']
def logout(self):
'''
登出
:return:
'''
rep = self.post(
url=self._host + '/api/accounts/logout',
data=self._userinfo
)
try:
res = rep.json()
except:
pass
else:
if res['error'] == 0 and res['message'] == '注销成功':
_Gmhttp._exist_user_objs.pop(self.phone)
def __del__(self):
self.close()
# 这个是 long 账号的实例 ,不一定所有环境都支持,先注释掉
# gmhttp = Gmhttp.instance()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment