Commit a765c395 authored by 林丽's avatar 林丽

不加test_backend

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Extra
.idea/
http://mars.test.gengmei.cc/
create_data:单纯的插入数据功能
log:单纯的制造log功能
stat:Mars接口测试(页面的接口)
test_data:通过case场景,测试Mars数据库的数据是否正确
plan1是已废弃的方案,plan2是现在列子(不完整,等文档)
之后的case都写在各自的模块下面
每天早的数据写在http://wiki.gengmei.cc/pages/viewpage.action?pageId=1837027
\ No newline at end of file
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
from test_backend.testing import TestCaseBase
from test_backend.connectMysql import ConnectMysql
import random
import string
from test_backend.settings import Settings
class Insert(TestCaseBase):
def insertData(self, sqlStr):
ConnectMysql.getDatas(sqlStr)
#print(deviceData)
#插入多条数据
times = 1
def test_Times(self):
i = 0
order_id = int(ConnectMysql.getDatas('select max(id) from zhengxing_test.api_order ')[0][0]) + i + 1
sqlStr = "INSERT INTO "+Settings.Backend.DBzhengxing_test+".api_order ( id, password, status, payment, expired_date, user_id," \
"service_id, created_time, last_modified, trade_no, trade_status, buyer_id, buyer_email," \
"total_fee, gmt_create, gmt_payment, origin_data, points, name, phone, address, postscript," \
"comments, settled_time, payment_channel, refund_comment, use_time, cash_back_failure_time, cash_back_fee," \
"cash_back_status, service_price, operate_user_id, validate_time, validated, service_snapshot," \
"service_item_key, source, service_mode, refund_time, cash_back_time, stage_app_id, is_stage, stage_status," \
"duration, is_settled) VALUES ( "+str(order_id)+",'', '1', '1000', '0000-00-00 00:00:00', '20295901', '4639585'," \
"NOW(), NOW(), '', '', '', '', '0', '0000-00-00 00:00:00', '0000-00-00 00:00:00'," \
" '', '0', 'uu', '18511585264', 'uu', '', '', '0000-00-00 00:00:00', " \
"'', '', '0000-00-00 00:00:00','0000-00-00 00:00:00', " \
"'0', '0', '4980', NULL, '0000-00-00 00:00:00', '1', '{}'," \
" '', '3', '0', '0000-00-00 00:00:00','0000-00-00 00:00:00', '', '0', '0', '0', '0');"
while i < self.times:
self.insertData(sqlStr)
i = i + 1
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
"""
****************************************************************
# collect.py
# Author : linli
# Version : 1.1.1
# Date : 2016-02-14
# Description: 埋点接口
****************************************************************
"""
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.device_data import Device
import time
import json
import zlib
from test_backend.connectMysql import ConnectMysql
import random
from test_backend.device_data import Device
from test_backend.log_data import LogData
#数据传送到/data/logstash/app_input/maidian
class RunerTimes(TestCaseBase):
def create_log_data(self, type, params, device_data):
paramList = LogData().getLogeData(type, params, device_data)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def test_Times(self):
i = 0
home_view_time = 10
device_data = Device().getDeviceData()
while i < home_view_time:
#首页PV
type = "page_view"
#business_id:doctor, service, diary, topic, zone
inTime = time.time()
outTime = time.time() + 5
params = {"page_name": "home", "business_id": '', "in": inTime, "out": outTime, "fake": 1}
self.log_data = self.create_log_data(type, params, device_data)
print('运行'+ str(type) + str( i+ 1) + '次')
i = i + 1
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
"""
****************************************************************
# login.py
# Author : linli
# Version : 1.1.2
# Date : 2016-02-022
# Description: 用户统计
****************************************************************
"""
import requests
from test_backend.testing import TestCaseBase
from test_backend.settings import Settings
class Daily(TestCaseBase):
def test_getDaily(self):
data = self.requests.get(self.base_url+"/stat/common/user/daily", data={})
def test_postDaily(self):
params = {'stat_start':'2016-01-05', 'stat_end':'2016-02-21', 'platform':'0', 'area':'全部'}
data = self.requests.post(self.base_url+"/stat/common/user/daily", data=params)
print(data.text)
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
from test_backend.connectMysql import ConnectMysql
from test_backend.run_summary_data import RunSummary
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.log_data import LogData
from test_backend.device_data import Device
import datetime
import time
import random
from test_backend.settings import Settings
from test_backend.app_sessionId import AppSessionId
class Calculation(TestCaseBase):
#测试event
log_data = []
mysql_data = []
deviceId = 0#真正的设备型号id
#1、根据预期制造数据
def create_log_data(self, type, params, device_data, timenow=None, app_session_id=None):
paramList = LogData().getLogeData(type, params, device_data, timenow, app_session_id)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def get_mysql_data(self, sqlStr):
mysql_data = ConnectMysql.getDatas(sqlStr)
return mysql_data
def create_data(self):
i = 0
home_view_time = 10
deviceIdList = ConnectMysql.getDatas('select device_id,user_id from '+Settings.Backend.DBzhengxing_test+
'.statistic_device_user where id > 200')
deviceS = random.choice(deviceIdList)
device_id = deviceS[0]
device_data = Device().getDeviceData(int(device_id))
deviceData = ConnectMysql.getDatas('SELECT device_id from '+Settings.Backend.DBzhengxing_test+'.statistic_device '
'WHERE id = '+ str(device_id))
self.deviceId = deviceData[0][0]
while i < home_view_time:
app_session_id = AppSessionId().getAppSessionId()
type = "device_opened"
params = {}
self.log_data = self.create_log_data(type, params, device_data, app_session_id=app_session_id)
type = "on_app_session_over"
params = {"duration": "5"}
self.log_data = self.create_log_data(type, params, device_data, app_session_id=app_session_id)
print('运行'+ str(type) + str( i+ 1) + '次')
i = i + 1
#或者造mysql数据
#dbname = Settings.Backend.DBmars+'.statistic_eventsummary'
#DeleteData().delete_data(dbname, '2016-03-02')
#2、linlin,跑脚本
def run_summary_data(self):
commend = '--master local[1] release.jar ' \
'--run-task X2_DeviceAndUserStat -A taskProcessingDate=2016-03-04'.split()
RunSummary.run_summary_data(commend)
def compareList(self, listA, listB):
if len(listA) != len(listB):
print('长度不等')
return False
l1 = sorted(listA)
l2 = sorted(listB)
#print(l1)
for v1, v2 in zip(l1, l2):
if v1 != v2:
print('数据不等')
return False
return True
#3、公式比对预期值
def comparison_data(self):
actual_value = self.get_mysql_data("select da.device_id, da.active_date, da.usage_duration from "
+Settings.Backend.DBmars+".statistic_device_active da "
"WHERE da.active_date = '2016-03-04' and da.device_id='"+str(self.deviceId)+"'")
#print(actual_value)
#根据具体情况处理数据,假设处理完数据如下
expected_value = [(self.deviceId, datetime.date(2016, 3, 4), 50L)]
assert self.compareList(expected_value, actual_value)
def test_calculation(self):
#case:造数据部分
self.create_data()
#case:运行汇总数据脚本
self.run_summary_data()
#case:数据进行比对
self.comparison_data()
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
from test_backend.connectMysql import ConnectMysql
from test_backend.run_summary_data import RunSummary
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.log_data import LogData
from test_backend.device_data import Device
import datetime
import time
import random
from test_backend.settings import Settings
from test_backend.app_sessionId import AppSessionId
class Calculation(TestCaseBase):
#测试event
log_data = []
mysql_data = []
deviceId = 0#真正的设备型号id
#1、根据预期制造数据
def create_log_data(self, type, params, device_data, timenow=None, app_session_id=None):
paramList = LogData().getLogeData(type, params, device_data, timenow, app_session_id)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def get_mysql_data(self, sqlStr):
mysql_data = ConnectMysql.getDatas(sqlStr)
return mysql_data
def create_data(self):
self.deviceId = AppSessionId().getAppSessionId()
device_r = [{"APP": {"is_activation": True},
"SYS": {"cl_id": str(self.deviceId), "channel": "Lenovo"},
"TIME": datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%dT%H:%M:%S+08:00")}]
Collect().write_gaia_log(device_r)
#或者造mysql数据
#dbname = Settings.Backend.DBmars+'.statistic_eventsummary'
#DeleteData().delete_data(dbname, '2016-03-02')
#2、linlin,跑脚本
def run_summary_data(self):
commend = '--master local[1] release.jar ' \
'--run-task X2_DeviceAndUserStat -A taskProcessingDate=2016-03-04'.split()
RunSummary.run_summary_data(commend)
def compareList(self, listA, listB):
if len(listA) != len(listB):
print('长度不等')
return False
l1 = sorted(listA)
l2 = sorted(listB)
#print(l1)
for v1, v2 in zip(l1, l2):
if v1 != v2:
print('数据不等')
return False
return True
#3、公式比对预期值
def comparison_data(self):
actual_value = self.get_mysql_data("select d.device_id, d.new_date, d.channel from "
+Settings.Backend.DBmars+".statistic_device d "
"WHERE d.new_date = '2016-03-04' and d.device_id='"+str(self.deviceId)+"'")
print(actual_value)
#根据具体情况处理数据,假设处理完数据如下
expected_value = [(self.deviceId, datetime.date(2016, 3, 4), "Lenovo")]
assert self.compareList(expected_value, actual_value)
def test_calculation(self):
#case:造数据部分
self.create_data()
#case:运行汇总数据脚本
self.run_summary_data()
#case:数据进行比对
self.comparison_data()
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import csv
from test_backend.testing import TestCaseBase
from test_backend.connectMysql import ConnectMysql
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.log_data import LogData
from test_backend.device_data import Device
import pandas as pd
import numpy as np
import datetime
import time
import random
from test_backend.settings import Settings
class Calculation(TestCaseBase):
#以首页轮播Banner为例
log_data = []
mysql_data = []
expected_value = {'date': '1', 'doctor_name': '2', 'new_convos': '3', 'noreply_convos': '4'}
#涉及公式 点击率 100%*该banner点击次数/首页PV
#1、根据预期制造数据
def create_log_data(self, type, params, device_data):
paramList = LogData().getLogeData(type, params, device_data)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def get_mysql_data(self, sqlStr):
mysql_data = ConnectMysql.getDatas(sqlStr)
return mysql_data
def create_data(self):
i = 0
banner_click_time = 2
home_view_time = 4
device_data = Device().getDeviceData(3)
while i < home_view_time:
#首页PV
type = "page_view"
#business_id:doctor, service, diary, topic, zone
inTime = time.time()
outTime = time.time() + 5
params = {"page_name": "home", "business_id": '', "in": inTime, "out": outTime, "fake": 0}
self.log_data = self.create_log_data(type, params, device_data)
print('运行'+ str(type) + str( i+ 1) + '次')
if i < banner_click_time:
#该banner点击次数
type = "on_click_banner"
banner_idList = [14, 77]
for id in banner_idList:
params = {"from": "home_banner", "banner_id": id}
self.log_data = self.create_log_data(type, params, device_data)
i = i + 1
#或者造mysql数据
#2、linlin,跑脚本
def run_summary_data(self):
time.sleep(20)
pass
def compareList(self, listA, listB):
if len(listA) != len(listB):
return False
l1 = sorted(listA)
l2 = sorted(listB)
for v1, v2 in zip(l1, l2):
if v1 != v2:
return False
return True
#3、公式比对预期值
def comparison_data(self):
#单列用户
expected_value = self.get_mysql_data("SELECT dv.stat_date, d.name, dv.new_pm_user_num, dv.noreply_pm_user_num "
" from "+Settings.Backend.DBzhengxing_test+".statistic_doctorview dv, zhengxing_test.api_doctor d "
"where dv.doctor_id = d.id and dv.stat_date = '2016-02-22' and "
"dv.doctor_id in(SELECT dd.doctor_id "
" from "+Settings.Backend.DBmars+".statistic_doctordetail dd "
"where dd.stat_type=1)")
actual_value = self.get_mysql_data("SELECT dd.date, dd.doctor_name, dd.new_convos, dd.noreply_convos "
" from "+Settings.Backend.DBmars+".statistic_doctordetail dd "
"where dd.date = '2016-02-22' and "
"dd.doctor_id in(SELECT dd.doctor_id "
" from "+Settings.Backend.DBmars+".statistic_doctordetail dd "
"where dd.stat_type=1)")
assert self.compareList(expected_value, actual_value)
#常例用户
expected_value = self.get_mysql_data("SELECT dv.stat_date, d.name, dv.new_pm_user_num, dv.noreply_pm_user_num "
" from "+Settings.Backend.DBzhengxing_test+".statistic_doctorview dv, "
"zhengxing_test.api_doctor d "
"where dv.doctor_id = d.id and dv.stat_date = '2016-02-22' "
"and (dv.new_pm_user_num>0 OR dv.noreply_pm_user_num>0)")
actual_value = self.get_mysql_data("SELECT dd.date, dd.doctor_name, dd.new_convos, dd.noreply_convos "
" from "+Settings.Backend.DBmars+".statistic_doctordetail dd "
"where dd.date = '2016-02-22' and "
"dd.doctor_id in(SELECT dd.doctor_id "
" from "+Settings.Backend.DBmars+".statistic_doctordetail dd "
"where dd.stat_type=0)")
assert self.compareList(expected_value, actual_value)
def test_calculation(self):
#case:造数据部分
#self.create_data()
#case:运行汇总数据脚本
self.run_summary_data()
#case:数据进行比对
self.comparison_data()
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import csv
from test_backend.testing import TestCaseBase
from test_backend.connectMysql import ConnectMysql
from test_backend.run_summary_data import RunSummary
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.log_data import LogData
from test_backend.device_data import Device
from test_backend.delete_mars_data import DeleteData
import pandas as pd
import numpy as np
import datetime
import time
import random
from test_backend.settings import Settings
class Calculation(TestCaseBase):
#测试event
log_data = []
mysql_data = []
#1、根据预期制造数据
def create_log_data(self, type, params, device_data, timenow=None):
paramList = LogData().getLogeData(type, params, device_data, timenow)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def get_mysql_data(self, sqlStr):
mysql_data = ConnectMysql.getDatas(sqlStr)
return mysql_data
def create_data(self):
i = 0
home_view_time = 10
device_data = Device().getDeviceData()
while i < home_view_time:
type = "main_click_tab" #
params = {"tab":"welfare"}#{"tag_id":1,"from":"diagnosis","type":"string","business_id":"string","action":"string"}
self.log_data = self.create_log_data(type, params, device_data)
print('运行'+ str(type) + str( i+ 1) + '次')
i = i + 1
#或者造mysql数据
#dbname = Settings.Backend.DBmars+'.statistic_eventsummary'
#DeleteData().delete_data(dbname, '2016-03-02')
#2、linlin,跑脚本
def run_summary_data(self):
commend = '--master local[1] release.jar ' \
'--run-task X2_MaidianSummary -A gmLogName=maidian -A taskProcessingDate=2016-03-03'.split()
RunSummary.run_summary_data(commend)
def compareList(self, listA, listB):
if len(listA) != len(listB):
print('长度不等')
return False
l1 = sorted(listA)
l2 = sorted(listB)
#print(l1)
for v1, v2 in zip(l1, l2):
if v1 != v2:
print('数据不等')
return False
return True
#3、公式比对预期值
def comparison_data(self):
actual_value = self.get_mysql_data("select d.date, d.eventdesc_id, d.page_view, d.user_view, d.device_view from "
+Settings.Backend.DBmars+".statistic_eventsummary d WHERE d.date = '2016-03-03'")
#print(actual_value)
#根据具体情况处理数据,假设处理完数据如下
expected_value = []
assert self.compareList(expected_value, actual_value)
def test_calculation(self):
#case:造数据部分
self.create_data()
#case:运行汇总数据脚本
self.run_summary_data()
#case:数据进行比对
self.comparison_data()
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import csv
from test_backend.testing import TestCaseBase
from test_backend.connectMysql import ConnectMysql
from test_backend.run_summary_data import RunSummary
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.log_data import LogData
from test_backend.device_data import Device
from test_backend.delete_mars_data import DeleteData
import pandas as pd
import numpy as np
import datetime
import time
import random
from test_backend.settings import Settings
class Calculation(TestCaseBase):
#测试page_view:
log_data = []
mysql_data = []
#1、根据预期制造数据
def create_log_data(self, type, params, device_data, timenow=None):
paramList = LogData().getLogeData(type, params, device_data, timenow)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def get_mysql_data(self, sqlStr):
mysql_data = ConnectMysql.getDatas(sqlStr)
return mysql_data
def create_data(self):
i = 0
home_view_time = 10
device_data = Device().getDeviceData()
while i < home_view_time:
type = "page_view"
#business_id:doctor, service, diary, topic, zone
inTime = time.time()
outTime = time.time() + 4.5
params = {"page_name": "event_promotion_coupon", "business_id": '', "in": inTime, "out": outTime, "fake": 0, "referrer": "welfare_home"}
self.log_data = self.create_log_data(type, params, device_data)
print('运行'+ str(type) + str( i+ 1) + '次')
i = i + 1
#或者造mysql数据
dbname = Settings.Backend.DBmars+'.statistic_pagesummary'
DeleteData().delete_data(dbname, '2016-02-29')
#2、linlin,跑脚本
def run_summary_data(self):
commend = '--master local[1] release.jar --run-task X2_AppPageSummary -A ' \
'gmLogName=maidian -A taskProcessingDate=2016-02-29'.split()
RunSummary.run_summary_data(commend)
def compareList(self, listA, listB):
if len(listA) != len(listB):
return False
l1 = sorted(listA)
l2 = sorted(listB)
for v1, v2 in zip(l1, l2):
if v1 != v2:
return False
return True
#3、公式比对预期值
def comparison_data(self):
actual_value = self.get_mysql_data("select d.date, d.event, d.event_type, d.page_view, d.user_view,d.device_view, d.duration from "
+Settings.Backend.DBmars+".statistic_pagesummary d WHERE d.date = '2016-02-29'")
#根据具体情况处理数据,假设处理完数据如下
expected_value = [(datetime.date(2016, 2, 29), 'home', 'page', 20L, 2L, 2L, 100L),
(datetime.date(2016, 2, 29), 'home', 'click', 10L, 1L, 1L, 50L),
(datetime.date(2016, 2, 29), 'register', 'page', 10L, 1L, 1L, 50L),
(datetime.date(2016, 2, 29), 'login_password', 'click', 11L, 2L, 2L, 55L),
(datetime.date(2016, 2, 29), 'diary_comment_list', 'click', 10L, 1L, 1L, 50L),
(datetime.date(2016, 2, 29), 'csc_name', 'click', 10L, 1L, 1L, 50L),
(datetime.date(2016, 2, 29), 'activity_promotion', 'page', 20L, 2L, 2L, 100L),
(datetime.date(2016, 2, 29), 'event_promotion_coupon', 'page', 10L, 1L, 1L, 45)]
assert self.compareList(expected_value, actual_value)
def test_calculation(self):
#case:造数据部分
self.create_data()
#case:运行汇总数据脚本
self.run_summary_data()
time.sleep(10)
#case:数据进行比对
self.comparison_data()
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import csv
from test_backend.testing import TestCaseBase
from test_backend.connectMysql import ConnectMysql
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.device_data import Device
import pandas as pd
import numpy as np
import datetime
import os
class Calculation(TestCaseBase):
log_data = []
mysql_data = []
#1、我们制造的log数据
def get_log_data(self):
today = datetime.date.today()
dataList = []
data = self.requests.get("http://log.test.gengmei.cc/maidian/maidian.log-"+str(today), data={})
data = data.text.split('\n')
for d in data[1:]:
dataList.append(json.loads(d))
#if
#print(json.dumps(json.loads(d), indent=4, sort_keys=True))
return dataList
#2、mysql的数据——读
def get_mysql_data(self):
mysql_data = ConnectMysql.getDatas('SELECT * from zhengxing_test.statistic_device limit 2')
return mysql_data
#3、计算——公式
def test_calculation(self):
self.log_data = self.get_log_data()
self.mysql_data = self.get_mysql_data()
df = pd.DataFrame(self.log_data)
df.to_csv(str(datetime.date.today())+'.csv')
#df.index.name='index'
#fileName = os.mknod(str(datetime.date.today())+'.csv')
#print(fileName)
"""with open('test_backend.resources.'+str(datetime.date.today())+'.csv', 'wb') as csvfile:
spamwriter = csv.writer(csvfile, dialect='excel')
spamwriter.writerow(df.head())"""
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import csv
from test_backend.testing import TestCaseBase
from test_backend.connectMysql import ConnectMysql
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.log_data import LogData
from test_backend.device_data import Device
import pandas as pd
import numpy as np
import datetime
import time
import random
class Calculation(TestCaseBase):
#以首页轮播Banner为例
log_data = []
mysql_data = []
expected_value = {'date': '1', 'bannername': '2', 'postion': '3', 'CPC': '4', 'postionref': '5'}
#涉及公式 点击率 100%*该banner点击次数/首页PV
#1、根据预期制造数据
def create_log_data(self, type, params, device_data):
paramList = LogData().getLogeData(type, params, device_data)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def get_mysql_data(self, sqlStr):
mysql_data = ConnectMysql.getDatas(sqlStr)
return mysql_data
def create_data(self):
i = 0
banner_click_time = 2
home_view_time = 4
device_data = Device().getDeviceData(3)
while i < home_view_time:
#首页PV
type = "page_view"
#business_id:doctor, service, diary, topic, zone
inTime = time.time()
outTime = time.time() + 5
params = {"page_name": "home", "business_id": '', "in": inTime, "out": outTime, "fake": 0}
self.log_data = self.create_log_data(type, params, device_data)
print('运行'+ str(type) + str( i+ 1) + '次')
if i < banner_click_time:
#该banner点击次数
type = "on_click_banner"
banner_idList = [14, 77]
for id in banner_idList:
params = {"from": "home_banner", "banner_id": id}
self.log_data = self.create_log_data(type, params, device_data)
i = i + 1
#或者造mysql数据
#2、linlin,跑脚本
def run_summary_data(self):
time.sleep(20)
pass
#3、公式比对预期值
def comparison_data(self):
data = self.get_mysql_data("SELECT * from hera_dev.statistic_userdaily where stat_date = '2016-01-17'")
print(data)
#根据具体情况处理数据,假设处理完数据如下
dataline = {'date': '1', 'bannername': '2', 'postion': '3', 'CPC': '4', 'postionref': '5'}
for key, value in self.expected_value.items():
assert dataline[key] == value
def test_calculation(self):
#case:造数据部分
#self.create_data()
#case:运行汇总数据脚本
self.run_summary_data()
#case:数据进行比对
self.comparison_data()
\ No newline at end of file
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
from test_backend.connectMysql import ConnectMysql
from test_backend.run_summary_data import RunSummary
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.log_data import LogData
from test_backend.device_data import Device
import datetime
import time
import random
from test_backend.settings import Settings
from test_backend.app_sessionId import AppSessionId
class Calculation(TestCaseBase):
#测试event
log_data = []
mysql_data = []
user_id = 0
#1、根据预期制造数据
def create_log_data(self, type, params, device_data, timenow=None, app_session_id=None):
paramList = LogData().getLogeData(type, params, device_data, timenow, app_session_id)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def get_mysql_data(self, sqlStr):
mysql_data = ConnectMysql.getDatas(sqlStr)
return mysql_data
def create_data(self):
i = 0
home_view_time = 10
deviceIdList = ConnectMysql.getDatas('select device_id,user_id from '+Settings.Backend.DBzhengxing_test+
'.statistic_device_user where id > 200')
deviceS = random.choice(deviceIdList)
device_id = deviceS[0]
self.user_id = deviceS[1]
device_data = Device().getDeviceData(int(device_id))
while i < home_view_time:
app_session_id = AppSessionId().getAppSessionId()
type = "device_opened"
params = {}
self.log_data = self.create_log_data(type, params, device_data, app_session_id=app_session_id)
type = "on_app_session_over"
params = {"duration": "5"}
self.log_data = self.create_log_data(type, params, device_data,timenow="2016-03-04", app_session_id=app_session_id)
print('运行'+ str(type) + str( i+ 1) + '次')
i = i + 1
#或者造mysql数据
#dbname = Settings.Backend.DBmars+'.statistic_eventsummary'
#DeleteData().delete_data(dbname, '2016-03-02')
#2、linlin,跑脚本
def run_summary_data(self):
commend = '--master local[1] release.jar ' \
'--run-task X2_DeviceAndUserStat -A taskProcessingDate=2016-03-04'.split()
RunSummary.run_summary_data(commend)
def compareList(self, listA, listB):
if len(listA) != len(listB):
print('长度不等')
return False
l1 = sorted(listA)
l2 = sorted(listB)
#print(l1)
for v1, v2 in zip(l1, l2):
if v1 != v2:
print('数据不等')
return False
return True
#3、公式比对预期值
def comparison_data(self):
actual_value = self.get_mysql_data("select ua.user_id, ua.active_date, ua.usage_duration from "
+Settings.Backend.DBmars+".statistic_user_active ua "
"WHERE ua.active_date = '2016-03-04' and ua.user_id="+str(self.user_id))
#print(actual_value)
#根据具体情况处理数据,假设处理完数据如下
expected_value = [(long(self.user_id), datetime.date(2016, 3, 4), 50L)]
assert self.compareList(expected_value, actual_value)
def test_calculation(self):
#case:造数据部分
self.create_data()
#case:运行汇总数据脚本
self.run_summary_data()
#case:数据进行比对
self.comparison_data()
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
from test_backend.connectMysql import ConnectMysql
from test_backend.run_summary_data import RunSummary
import json
from test_backend.testing import TestCaseBase
from test_backend.collect import Collect
from test_backend.log_data import LogData
from test_backend.device_data import Device
import datetime
import time
import random
from test_backend.settings import Settings
from test_backend.app_sessionId import AppSessionId
class Calculation(TestCaseBase):
#测试event
log_data = []
mysql_data = []
userId = 0#真正的设备型号id
#1、根据预期制造数据
def create_log_data(self, type, params, device_data, timenow=None, app_session_id=None):
paramList = LogData().getLogeData(type, params, device_data, timenow, app_session_id)
data = Collect.postDatas(paramList)
#记录到 /data/logstash/app_input/maidian
def get_mysql_data(self, sqlStr):
mysql_data = ConnectMysql.getDatas(sqlStr)
return mysql_data
def create_data(self):
user_id = ConnectMysql.getDatas('select max(id) from '+Settings.Backend.DBmars+'.statistic_user')[0][0]
self.userId = int(user_id) + 1
device_r = [{"APP": {"user_register_info": {"user_id": self.userId}},
"SYS": {"channel": "Lenovo"},
"TIME": datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%dT%H:%M:%S+08:00")}]
Collect().write_gaia_log(device_r)
#或者造mysql数据
#dbname = Settings.Backend.DBmars+'.statistic_eventsummary'
#DeleteData().delete_data(dbname, '2016-03-02')
#2、linlin,跑脚本
def run_summary_data(self):
commend = '--master local[1] release.jar ' \
'--run-task X2_DeviceAndUserStat -A taskProcessingDate=2016-03-04 -A dbEnableDebug=1'.split()
RunSummary.run_summary_data(commend)
def compareList(self, listA, listB):
if len(listA) != len(listB):
print('长度不等')
return False
l1 = sorted(listA)
l2 = sorted(listB)
#print(l1)
for v1, v2 in zip(l1, l2):
if v1 != v2:
print('数据不等')
return False
return True
#3、公式比对预期值
def comparison_data(self):
actual_value = self.get_mysql_data("select u.user_id, u.new_date, u.channel from "
+Settings.Backend.DBmars+".statistic_user u "
"WHERE u.new_date = '2016-03-04' and u.user_id="+str(self.userId))
print(actual_value)
#根据具体情况处理数据,假设处理完数据如下
expected_value = [(self.userId, datetime.date(2016, 3, 4), "Lenovo")]
assert self.compareList(expected_value, actual_value)
def test_calculation(self):
#case:造数据部分
self.create_data()
#case:运行汇总数据脚本
self.run_summary_data()
#case:数据进行比对
self.comparison_data()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment