Commit 70b51392 authored by 张彦钊's avatar 张彦钊

change test file

parent f6c30f03
# -*- coding: UTF-8 -*- # -*- coding: UTF-8 -*-
import pymysql import redis
import datetime import datetime
import pandas as pd import json
def get_yesterday_date(): if __name__ == "__main__":
today = datetime.date.today() device_id = "869414033653380"
yesterday = today - datetime.timedelta(days=1) r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/1")
yesterday = yesterday.strftime("%Y-%m-%d")
print(yesterday)
return yesterday
def get_black_user():
conn2db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
cursor = conn2db.cursor()
sql = "select distinct device_id from blacklist"
cursor.execute(sql)
result = cursor.fetchall()
black_user = pd.DataFrame(list(result))[0].values.tolist()
cursor.close()
conn2db.close()
return black_user
def get_data():
conn2db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
cursor = conn2db.cursor()
sql = "select distinct device_id from ffm_diary_queue_temp where device_id regexp '[5|6]$'"
cursor.execute(sql)
result = cursor.fetchall()
device = pd.DataFrame(list(result))[0].values.tolist()
cursor.close()
conn2db.close()
device = tuple(set(device)-set(black))
return device
def ctr_all():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
cursor = db.cursor()
sql_active = "select distinct device_id from data_feed_exposure " \
"where cid_type = 'diary'" \
"and device_id regexp'[5|6]$' and stat_date = '{}';".format(date)
cursor.execute(sql_active)
result = cursor.fetchall()
tail56 = pd.DataFrame(list(result))[0].values.tolist()
tail56 = set(tail56)-set(black)
print("当天尾号5或6活跃用户总数:")
print(len(tail56))
cover = len(tail56&set(device_id))
print("当天尾号5或6活跃用户覆盖数:")
print(cover)
cover_percent = format(cover / len(tail56), ".6f")
print("当天尾号5或6活跃用户覆盖率:")
print(cover_percent)
return len(tail56),cover,cover_percent
def ctr():
sql_click = "select count(cid) from data_feed_click " \
"where (cid_type = 'diary' or cid_type = 'diary_video') " \
"and stat_date = '{}' and device_id in {};".format(date,device_id)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
cursor = db.cursor()
cursor.execute(sql_click)
click = cursor.fetchone()[0]
print("实验用户点击数:"+str(click))
sql_exp = "select count(cid) from data_feed_exposure " \
"where cid_type = 'diary'" \
"and stat_date = '{}' and device_id in {};".format(date,device_id)
cursor.execute(sql_exp)
exp = cursor.fetchone()[0]
print("实验用户曝光数:"+str(exp))
print("实验用户点击率:"+str(click/exp))
return click,exp,format(click/exp,".6f")
def rate2file():
output_path = DIRECTORY_PATH + "56ctr.csv"
with open(output_path,'a+') as f:
line = date.replace('-', '')+','+str(temp_data[0])+','+str(temp_data[1])+','+str(temp_data[2])+\
","+str(data[0])+","+str(data[1])+","+str(data[2])+'\n'
f.write(line)
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
value = json.dumps([1,2,3])
r.hset(search_topic_recommend_key,'tractate_queue',value)
def get_time():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select dur_time from cost_time"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
db.close()
return df
print(1)
if __name__ == "__main__":
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "delete from esmm_train_data_dwell where stat_date >= '2019-07-28' limit 60000"
cursor = db.cursor()
for i in range(15):
cursor.execute(sql)
db.commit()
...@@ -20,7 +20,8 @@ import datetime ...@@ -20,7 +20,8 @@ import datetime
def Json(x): def Json(x):
data = json.loads(x[1]) data = json.loads(x[1])
if 'type' in data and 'device' in data and 'params' in data and 'card_content_type' in data['params']: if 'type' in data and 'device' in data and 'params' in data and 'card_content_type' in data['params']:
if data['type'] == 'on_click_card' and data["device"]["device_id"] == "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6": if data['type'] == 'on_click_card' and data["device"]["device_id"] == "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6" \
and data['params']['card_content_type'] in ("answer","user_post"):
return True return True
else: else:
return False return False
...@@ -30,8 +31,8 @@ def Json(x): ...@@ -30,8 +31,8 @@ def Json(x):
def model(rdd): def model(rdd):
try: try:
rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\ rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))
.map(lambda x:write_redis(x[0],x[1],x[2])) # .map(lambda x:write_redis(x[0],x[1],x[2]))
return rdd return rdd
except: except:
print("fail") print("fail")
...@@ -192,7 +193,6 @@ def diary_write(device_id,cid): ...@@ -192,7 +193,6 @@ def diary_write(device_id,cid):
cursor.execute(sql) cursor.execute(sql)
result = cursor.fetchall() result = cursor.fetchall()
if len(result) > 0: if len(result) > 0:
print("cunzai")
tags = result[0][0] tags = result[0][0]
if tags is not None: if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \ sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
...@@ -221,23 +221,23 @@ def diary_write(device_id,cid): ...@@ -221,23 +221,23 @@ def diary_write(device_id,cid):
print(e) print(e)
# sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf")) sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
# ssc = StreamingContext(sc,4) ssc = StreamingContext(sc,4)
# sc.setLogLevel("WARN") sc.setLogLevel("WARN")
# kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092", kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
# "group.id": "dislike", "group.id": "dislike",
# "socket.timeout.ms": "600000", "socket.timeout.ms": "600000",
# "auto.offset.reset": "largest"} "auto.offset.reset": "largest"}
#
#
# stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams) stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
# transformstream = stream.transform(lambda x:model(x)) transformstream = stream.transform(lambda x:model(x))
# transformstream.pprint() transformstream.pprint()
#
# ssc.start() ssc.start()
# ssc.awaitTermination() ssc.awaitTermination()
diary_write("9C5E7C73-380C-4623-8F48-A64C8034E315",16952841) # diary_write("9C5E7C73-380C-4623-8F48-A64C8034E315",16952841)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment