Commit c4fa88d5 authored by 张彦钊's avatar 张彦钊

update dairyQueueUpdate file

parent 4cecec77
......@@ -7,11 +7,10 @@ from datetime import datetime
import utils
import warnings
from multiprocessing import Pool
from config import *
import json
from userProfile import get_active_users
from sklearn.preprocessing import MinMaxScaler
import time
from userProfile import get_active_users
from config import *
import os
......@@ -27,65 +26,6 @@ def get_video_id():
db.close()
return video_id
def test_con_sql(device_id):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
sql = "select native_queue,nearby_queue,nation_queue,megacity_queue from device_diary_queue " \
"where device_id = '{}';".format(device_id)
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
if not df.empty:
df = df.rename(columns={0: "native_queue", 1: "nearby_queue", 2: "nation_queue", 3: "megacity_queue"})
native_queue = df.loc[0, "native_queue"].split(",")
native_queue = list(map(lambda x:"diary|"+str(x),native_queue))
native_queue = list(set(native_queue)&set(data_set_cid))
nearby_queue = df.loc[0, "nearby_queue"].split(",")
nearby_queue = list(map(lambda x: "diary|" + str(x), nearby_queue))
nearby_queue = list(set(nearby_queue)&set(data_set_cid))
nation_queue = df.loc[0, "nation_queue"].split(",")
nation_queue = list(map(lambda x: "diary|" + str(x), nation_queue))
nation_queue = list(set(nation_queue)&set(data_set_cid))
megacity_queue = df.loc[0, "megacity_queue"].split(",")
megacity_queue = list(map(lambda x: "diary|" + str(x), megacity_queue))
megacity_queue = list(set(megacity_queue)&set(data_set_cid))
db.close()
return True,native_queue, nearby_queue, nation_queue, megacity_queue
else:
print("该用户对应的日记队列为空")
return False,[],[],[],[]
# 更新前获取最新的native_queue
def get_native_queue(device_id):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
sql = "select native_queue from device_diary_queue where device_id = '{}';".format(device_id)
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
if not df.empty:
native_queue = df.loc[0,0].split(",")
native_queue = list(map(lambda x:"diary|"+str(x),native_queue))
native_queue = list(set(native_queue) & set(data_set_cid))
db.close()
# print("成功获取native_queue")
return native_queue
else:
return None
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def feature_en(x_list, device_id):
data = pd.DataFrame(x_list)
......@@ -106,203 +46,237 @@ def feature_en(x_list, device_id):
return data
# 把ffm.pkl load进来,将上面的转化为ffm格式
def transform_ffm_format(df,queue_name):
# 把ffm.pkl load进来,将上面的数据转化为ffm格式
def transform_ffm_format(df,queue_name,device_id):
with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.native_transform(df)
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name)
data.to_csv(predict_file_name, index=False, header=None)
# print("done ffm")
print("done ffm")
return predict_file_name
# 将模型加载,预测
def predict(queue_name, x_list):
data = feature_en(x_list,device_id)
data_file_path = transform_ffm_format(data,queue_name)
def predict(queue_name,queue_arg,device_id,city_id):
data = feature_en(queue_arg[0], device_id)
data_file_path = transform_ffm_format(data,queue_name,device_id)
ffm_model = xl.create_ffm()
ffm_model.setTest(data_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name))
return save_result(queue_name, x_list)
DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name))
def save_result(queue_name, x_list):
score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name), header=None)
# print(score_df)
def save_result(queue_name,queue_arg,device_id):
score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name), header=None)
mm_scaler = MinMaxScaler()
mm_scaler.fit(score_df)
score_df = pd.DataFrame(mm_scaler.transform(score_df))
print("概率前十行:")
# print(score_df)
score_df = score_df.rename(columns={0: "score"})
score_df["cid"] = queue_arg[0]
# 去掉cid前面的"diary|"
score_df["cid"] = score_df["cid"].apply(lambda x:x[6:])
print("score_df:")
print(score_df.head(1))
print(score_df.shape)
if queue_arg[1] != []:
df_temp = pd.DataFrame(queue_arg[1]).rename(columns={0: "cid"})
df_temp["score"] = 0
df_temp = df_temp.sort_index(axis=1,ascending=False)
df_temp["cid"] = df_temp["cid"].apply(lambda x: x[6:])
print("temp_df:")
print(df_temp.head(1))
print(df_temp.shape)
predict_score_df = score_df.append(df_temp)
print("score_df:")
print(predict_score_df.head(1))
print(predict_score_df.shape)
return predict_score_df
score_df["cid"] = x_list
return merge_score(x_list, score_df)
else:
return score_df
def merge_score(x_list, score_df):
def get_score(queue_arg, predict_score_df):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle')
cursor = db.cursor()
# 去除diary_id 前面的"diary|"
x_list = tuple(list(map(lambda x:x[6:],x_list)))
# TODO 把id也取下来,这样可以解决分数不匹配的问题
sql = "select score from biz_feed_diary_score where diary_id in {};".format(x_list)
diary_list = tuple(list(map(lambda x:x[6:],queue_arg[2])))
sql = "select score,diary_id from biz_feed_diary_score where diary_id in {};".format(diary_list)
cursor.execute(sql)
result = cursor.fetchall()
score = pd.DataFrame(list(result))
# print("数据库日记表前十行")
# # print(score)
score_list = score[0].values.tolist()
db.close()
score_df["score"] = score_df["score"] + score_list
return update_dairy_queue(score_df)
score_df = pd.DataFrame(list(result)).dropna()
if score_df.empty:
print("获取的日记列表是空")
return False
else:
score_df.rename(columns = {0:"score",1:"cid"})
print("日记打分表")
print(score_df.head(1))
db.close()
return score_df
def update_dairy_queue(score_df):
def update_dairy_queue(score_df,predict_score_df):
diary_id = score_df["cid"].values.tolist()
all_video_id = get_video_id()
print("all viedo")
print(all_video_id)
print("diaryid")
print(diary_id)
video_id = list(set(all_video_id)&set(diary_id))
print("交集")
print(video_id)
# x = 1
# while x < len(diary_id):
# video_id.append(diary_id[x])
# x += 5
video_id = []
x = 1
while x < len(diary_id):
video_id.append(diary_id[x])
x += 5
if len(video_id)>0:
not_video_id = list(set(diary_id) - set(video_id))
not_video_id_df = score_df.loc[score_df["cid"].isin(not_video_id)]
not_video_id_df = not_video_id_df.sort_values(by="score", ascending=False)
video_id_df = score_df.loc[score_df["cid"].isin(video_id)]
video_id_df = video_id_df.sort_values(by="score", ascending=False)
not_video_id = not_video_id_df["cid"].values.tolist()
video_id = video_id_df["cid"].values.tolist()
diary_id = not_video_id
not_video = list(set(diary_id) - set(video_id))
# 为了相加时,cid能够匹配,先把cid变成索引
not_video_df = score_df.loc[score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_df["score"] = not_video_df["score"] + not_video_predict_df["score"]
not_video_df = not_video_df.sort_values(by="score", ascending=False)
video_df = score_df.loc[score_df["cid"].isin(video_id)].set_index(["cid"])
video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(video_id)].set_index(["cid"])
video_df["score"] = video_df["score"] + video_predict_df["score"]
video_df = video_df.sort_values(by="score", ascending=False)
not_video_id = not_video_df.index.tolist()
video_id = video_df.index.tolist()
new_queue = not_video_id
i = 1
for j in video_id:
diary_id.insert(i, j)
new_queue.insert(i, j)
# TODO 下面的3是测试用的,如果上线后,把3改成5
i += 3
return diary_id
print("分数合并成功")
return new_queue
# 如果没有视频日记
else:
score_df = score_df.set_index(["cid"])
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"]=score_df["score"]+predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
print("排序后")
print(score_df["cid"].values.tolist())
return score_df["cid"].values.tolist()
print("1分数合并成功")
return score_df.index.tolist()
def update_sql_dairy_queue(queue_name, diary_id):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='workwork', db='doris_test')
cursor = db.cursor()
## 去除diary_id 前面的"diary|"
diary_id = json.dumps(list(map(lambda x:x[6:],diary_id)))
id_str = str(diary_id[0])
for i in range(1, len(diary_id)):
id_str = id_str + "," + str(diary_id[i])
print("写入前")
print(diary_id)
# sql = "update device_diary_queue set {}='{}' where device_id = '{}'".format(queue_name, diary_id, device_id)
# cursor.execute(sql)
# db.close()
print(id_str[:80])
sql = "update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\
(queue_name,id_str,device_id, city_id)
cursor.execute(sql)
db.commit()
db.close()
print("成功写入diaryid")
def multi_update(key, name_dict,native_queue_list):
if name_dict[key] != []:
diary_id = predict(key, name_dict[key])
# 更新前获取最新的native_queue
def get_native_queue(device_id,city_id):
db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='workwork', db='doris_test')
cursor = db.cursor()
sql = "select native_queue from device_diary_queue where device_id = '{}' and city_id = '{}';".format(device_id,city_id)
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
if not df.empty:
native_queue = df.loc[0,0].split(",")
native_queue = list(map(lambda x:"diary|"+str(x),native_queue))
db.close()
# print("成功获取native_queue")
return native_queue
else:
return None
if get_native_queue(device_id) == native_queue_list:
update_sql_dairy_queue(key, diary_id)
print("更新结束")
else:
print("不需要更新日记队列")
def get_queue(device_id, city_id,queue_name):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
sql = "select {} from device_diary_queue " \
"where device_id = '{}' and city_id = '{}';".format(queue_name,device_id, city_id)
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
if not df.empty:
queue_list = df.loc[0,0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
db.close()
return queue_list
else:
print("该用户对应的日记队列为空")
return False
def user_update(device_id):
not_empty,native_queue_list, nearby_queue_list, nation_queue_list, megacity_queue_list = test_con_sql(device_id)
if not_empty:
name_dict = {"native_queue": native_queue_list, "nearby_queue": nearby_queue_list,
"nation_queue": nation_queue_list, "megacity_queue": megacity_queue_list}
pool = Pool(1)
for key in name_dict.keys():
pool.apply_async(multi_update, (key, name_dict,native_queue_list,))
pool.close()
pool.join()
if __name__ == "__main__":
# while True:
empty,device_id_list = get_active_users()
if empty:
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
time.sleep(58)
def pipe_line(queue_name, queue_arg, device_id, city_id):
predict(queue_name, queue_arg, device_id, city_id)
predict_score_df = save_result(queue_name, queue_arg, device_id)
score_df = get_score(queue_arg)
if score_df:
diary_queue = update_dairy_queue(score_df, predict_score_df)
return diary_queue
else:
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
predict_list = list(set(device_id_list) & set(old_device_id_list))
predict_list.extend(["358035085192742"])
# 只预测尾号是6的ID,这块也可以在数据库取数据时过滤一下
# predict_list = list(filter(lambda x:str(x)[-1] == "6", predict_list))
start = time.time()
warnings.filterwarnings("ignore")
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
return False
def user_update(device_id,city_id,data_set_cid):
global native_queue_list
queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
for queue_name in queue_name_list:
queue_list = get_queue(device_id, city_id,queue_name)
if queue_name == "native_queue":
native_queue_list = queue_list
if queue_list:
queue_predict = list(set(queue_list) & set(data_set_cid))
queue_not_predict = list(set(queue_list) - set(data_set_cid))
queue_arg = [queue_predict,queue_not_predict,queue_list]
if queue_predict != []:
diary_queue = pipe_line(queue_name, queue_arg, device_id, city_id)
if diary_queue and (native_queue_list == get_native_queue(device_id, city_id)):
update_sql_dairy_queue(queue_name, diary_queue, device_id, city_id)
print("更新结束")
else:
print("日记队列已更新不需要更新日记队列,或者日记队列为空")
else:
print("预测集是空,不需要预测")
else:
print("日记队列为空")
if predict_list != []:
for device_id in predict_list:
user_update(device_id)
else:
print("该列表是新用户,不需要预测")
end = time.time()
print("在不在")
print("358035085192742" in predict_list)
print("AB20292B-5D15-4C44-9429-1C2FF5ED26F6" in predict_list)
print("B2F0665E-4375-4169-8FE3-8A26A1CFE248" in predict_list)
print(predict_list)
print(end - start)
def run():
# TODO 如果测刘潇的没问题,去掉下面代码的注释
# device_city_list = get_active_users()
# TODO 先测一下高雅喆的,如果没问题,然后再测刘潇的
device_city_list = (("AB20292B-5D15-4C44-9429-1C2FF5ED26F6", "beijing"))
# TODO 测试通过后加上计时
start = time.time()
warnings.filterwarnings("ignore")
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
for device_city in device_city_list:
user_update(device_city[0], device_city[1],data_set_cid)
end = time.time()
if __name__ == "__main__":
# todo 正式上线后把下面while True的代码加上
# while True:
run()
# # TODO 上线后把预测用户改成多进程预测
# # TODO 上线后把预测用户改成多进程预测
# data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
#
# device_id = "358035085192742"
# native_queue_list, nearby_queue_list, nation_queue_list, megacity_queue_list = test_con_sql(device_id)
# name_dict = {"native_queue": native_queue_list, "nearby_queue": nearby_queue_list,
# "nation_queue": nation_queue_list, "megacity_queue": megacity_queue_list}
#
# for key in name_dict.keys():
# multi_update(key, name_dict)
# predict(key, name_dict[key])
# score_df = save_result(key, name_dict[key])
# score_df = merge_score(name_dict[key], score_df)
# diary_id = update_dairy_queue(score_df)
......@@ -172,7 +172,7 @@ def update_dairy_queue(score_df,predict_score_df):
return score_df.index.tolist()
def update_sql_dairy_queue(queue_name, diary_id,device_city):
def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='workwork', db='doris_test')
cursor = db.cursor()
......@@ -182,7 +182,7 @@ def update_sql_dairy_queue(queue_name, diary_id,device_city):
print("写入前")
print(id_str[:80])
sql = "update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\
(queue_name,id_str,device_city[0],device_city[1])
(queue_name,id_str,device_id, city_id)
cursor.execute(sql)
db.commit()
db.close()
......@@ -211,10 +211,10 @@ def get_native_queue(device_id,city_id):
def multi_update(queue_name,queue_arg,device_id,city_id):
if queue_arg[0] != []:
diary_id = predict(queue_name,queue_arg,device_id,city_id)
update_sql_dairy_queue(queue_name, diary_id,device_id,city_id)
print("更新结束")
return diary_id
else:
print("预测集是空,不需要预测")
return False
def get_queue(device_id, city_id,queue_name):
......@@ -227,35 +227,33 @@ def get_queue(device_id, city_id,queue_name):
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
if not df.empty:
queue_list = df.loc[0,0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
db.close()
return True, queue_list
return queue_list
else:
print("该用户对应的日记队列为空")
return False, []
return False
def user_update(device_id,city_id):
global native_queue_list
queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
for queue_name in queue_name_list:
exist,queue_list = get_queue(device_id, city_id,queue_name)
# 下面的代码是用来对比native_queue是否发生变化,如果发生了变化,就不更新日记队列了
# if queue_name == "native_queue":
# native_queue_list =
if exist:
queue_predcit = list(set(queue_list) & set(data_set_cid))
queue_not_predcit = list(set(queue_list) - set(data_set_cid))
queue_arg = [queue_predcit,queue_not_predcit,queue_list]
multi_update(queue_name,queue_arg,device_id,city_id)
queue_list = get_queue(device_id, city_id,queue_name)
if queue_name == "native_queue":
native_queue_list = queue_list
if queue_list:
queue_predict = list(set(queue_list) & set(data_set_cid))
queue_not_predict = list(set(queue_list) - set(data_set_cid))
queue_arg = [queue_predict,queue_not_predict,queue_list]
diary_id = multi_update(queue_name, queue_arg, device_id, city_id)
if diary_id and (native_queue_list == get_native_queue(device_id,city_id)):
update_sql_dairy_queue(queue_name, diary_id, device_id, city_id)
print("更新结束")
else:
print("不需要更新日记队列")
else:
print("日记队列为空")
......
from utils import con_sql
from datetime import datetime
from config import *
import pandas as pd
import os
import time
# 获取当下一分钟内活跃用户
......@@ -7,19 +11,40 @@ def get_active_users():
now = datetime.now()
now_start = str(now)[:16] + ":00"
now_end = str(now)[:16] + ":59"
没有city_id的是“” 这个值可能是空
sql = "select device_id from user_active_time order by active_time desc limit 1;"
# sql = "select device_id from user_active_time " \
sql = "select device_id,city_id from user_active_time limit 1;"
# TODO 正式上线后用下面的sql语句
# sql = "select device_id,city_id from user_active_time " \
# "where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start)
device_id_df = con_sql(sql)
if device_id_df.empty:
df = con_sql(sql)
if df.empty:
print("当下这一分钟没有活跃用户,不需要预测")
return True,None
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
time.sleep(58)
return False
else:
device_id_list = device_id_df[0].values.tolist()
# 对device_id 进行去重
device_id_list = list(set(device_id_list))
return False,device_id_list
df = df.rename(columns={0: "device_id", 1: "city_id"})
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
df = df.loc[df["device_id"].isin(old_device_id_list)]
if df.empty:
print("该列表是新用户,不需要预测")
else:
# TODO 正式上线后注释下面的只预测尾号是6的代码
# 只预测尾号是6的ID,这块是测试要求的,这块也可以在数据库取数据时过滤一下
device_temp_list = df["device_id"].values.tolist()
predict_list = list(filter(lambda x: str(x)[-1] == "6", device_temp_list))
df = df.loc[df["device_id"].isin(predict_list)]
if df.empty:
print("没有尾号是6的用户,不需要预测")
else:
device_list = df["device_id"].values.tolist()
city_list = df["city_id"].values.tolist()
device_city_list = list(zip(device_list, city_list))
return device_city_list
def fetch_user_profile(device_id):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment