Commit 4cecec77 authored by 张彦钊's avatar 张彦钊

update test file

parent 49725eaf
...@@ -47,21 +47,21 @@ def feature_en(x_list, device_id): ...@@ -47,21 +47,21 @@ def feature_en(x_list, device_id):
# 把ffm.pkl load进来,将上面的表转化为ffm格式 # 把ffm.pkl load进来,将上面的表转化为ffm格式
def transform_ffm_format(df,queue_name): def transform_ffm_format(df,queue_name,device_id):
# with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f: # with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f:
with open("/Users/mac/utils/ffm.pkl", "rb") as f: with open("/Users/mac/utils/ffm.pkl", "rb") as f:
ffm_format_pandas = pickle.load(f) ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.native_transform(df) data = ffm_format_pandas.native_transform(df)
# predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_city[0], queue_name) # predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name)
predict_file_name = "/Users/mac/utils/result/{0}_{1}.csv".format(queue_name) predict_file_name = "/Users/mac/utils/result/{0}.csv".format(queue_name)
data.to_csv(predict_file_name, index=False, header=None) data.to_csv(predict_file_name, index=False, header=None)
print("done ffm") print("done ffm")
return predict_file_name return predict_file_name
# 将模型加载,预测 # 将模型加载,预测
def predict(queue_name, name_dict): def predict(queue_name,queue_arg,device_id,city_id):
data = feature_en(name_dict[queue_name][0], device_city[0]) data = feature_en(queue_arg[0], device_id)
data_file_path = transform_ffm_format(data,queue_name) data_file_path = transform_ffm_format(data,queue_name)
ffm_model = xl.create_ffm() ffm_model = xl.create_ffm()
...@@ -71,27 +71,26 @@ def predict(queue_name, name_dict): ...@@ -71,27 +71,26 @@ def predict(queue_name, name_dict):
ffm_model.predict("/Users/mac/utils/model.out", ffm_model.predict("/Users/mac/utils/model.out",
"/Users/mac/utils/result/{0}_output.txt".format(queue_name)) "/Users/mac/utils/result/{0}_output.txt".format(queue_name))
# ffm_model.predict(DIRECTORY_PATH + "model.out", # ffm_model.predict(DIRECTORY_PATH + "model.out",
# DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_city[0], queue_name)) # DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name))
return save_result(queue_name, name_dict) return save_result(queue_name,queue_arg,device_id)
def save_result(queue_name, name_dict): def save_result(queue_name,queue_arg,device_id):
# score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_city[0], queue_name), header=None) # score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name), header=None)
score_df = pd.read_csv("/Users/mac/utils/result/{0}_output.txt".format(queue_name), header=None) score_df = pd.read_csv("/Users/mac/utils/result/{0}_output.txt".format(queue_name), header=None)
# print(score_df) # print(score_df)
mm_scaler = MinMaxScaler() mm_scaler = MinMaxScaler()
mm_scaler.fit(score_df) mm_scaler.fit(score_df)
score_df = pd.DataFrame(mm_scaler.transform(score_df)) score_df = pd.DataFrame(mm_scaler.transform(score_df))
score_df = score_df.rename(columns={0: "score"}) score_df = score_df.rename(columns={0: "score"})
score_df["cid"] = name_dict[queue_name][0] score_df["cid"] = queue_arg[0]
# 去掉cid前面的"diary|" # 去掉cid前面的"diary|"
score_df["cid"] = score_df["cid"].apply(lambda x:x[6:]) score_df["cid"] = score_df["cid"].apply(lambda x:x[6:])
print("score_df:") print("score_df:")
print(score_df.head(1)) print(score_df.head(1))
print(score_df.shape) print(score_df.shape)
if queue_arg[1] != []:
df_temp = pd.DataFrame(name_dict[queue_name][1]).rename(columns={0: "cid"}) df_temp = pd.DataFrame(queue_arg[1]).rename(columns={0: "cid"})
df_temp["score"] = 0 df_temp["score"] = 0
df_temp = df_temp.sort_index(axis=1,ascending=False) df_temp = df_temp.sort_index(axis=1,ascending=False)
df_temp["cid"] = df_temp["cid"].apply(lambda x: x[6:]) df_temp["cid"] = df_temp["cid"].apply(lambda x: x[6:])
...@@ -104,25 +103,27 @@ def save_result(queue_name, name_dict): ...@@ -104,25 +103,27 @@ def save_result(queue_name, name_dict):
print(predict_score_df.head(1)) print(predict_score_df.head(1))
print(predict_score_df.shape) print(predict_score_df.shape)
return merge_score(queue_name, name_dict, predict_score_df) return merge_score(queue_name, queue_arg, predict_score_df)
else:
return merge_score(queue_name, queue_arg, score_df)
def merge_score(queue_name, name_dict, predict_score_df): def merge_score(queue_name, queue_arg, predict_score_df):
db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work', db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='workwork', db='zhengxing_test') passwd='workwork', db='zhengxing_test')
cursor = db.cursor() cursor = db.cursor()
# 去除diary_id 前面的"diary|" # 去除diary_id 前面的"diary|"
diary_list = tuple(list(map(lambda x:x[6:],name_dict[queue_name][2]))) diary_list = tuple(list(map(lambda x:x[6:],queue_arg[2])))
print(diary_list)
sql = "select score,diary_id from biz_feed_diary_score where diary_id in {};".format(diary_list) sql = "select score,diary_id from biz_feed_diary_score where diary_id in {};".format(diary_list)
cursor.execute(sql) cursor.execute(sql)
result = cursor.fetchall() result = cursor.fetchall()
score_df = pd.DataFrame(list(result)).rename(columns = {0:"score",1:"cid"}) score_df = pd.DataFrame(list(result)).rename(columns = {0:"score",1:"cid"})
print("日记打分表") print("日记打分表")
print(score_df.head(1)) print(score_df.head(2))
db.close() db.close()
return update_dairy_queue(score_df,predict_score_df) return update_dairy_queue(score_df,predict_score_df)
...@@ -135,21 +136,23 @@ def update_dairy_queue(score_df,predict_score_df): ...@@ -135,21 +136,23 @@ def update_dairy_queue(score_df,predict_score_df):
while x < len(diary_id): while x < len(diary_id):
video_id.append(diary_id[x]) video_id.append(diary_id[x])
x += 5 x += 5
if len(video_id)>0: if len(video_id)>0:
not_video = list(set(diary_id) - set(video_id)) not_video = list(set(diary_id) - set(video_id))
# 为了相加时,cid能够匹配,先把cid变成索引,相加后,再把cid恢复成列 # 为了相加时,cid能够匹配,先把cid变成索引,相加后,再把cid恢复成列
not_video_df = score_df.loc[score_df["cid"].isin(not_video)].reset_index(["cid"])
not_video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(not_video)].reset_index(["cid"])
not_video_df["score"]=not_video_df["score"]+not_video_predict_df["score"]
not_video_df = not_video_df.reset_index().sort_values(by="score", ascending=False)
video_df = score_df.loc[score_df["cid"].isin(video_id)].reset_index(["cid"]) not_video_df = score_df.loc[score_df["cid"].isin(not_video)].set_index(["cid"])
video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(video_id)].reset_index(["cid"]) not_video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_df["score"] = not_video_df["score"] + not_video_predict_df["score"]
not_video_df = not_video_df.sort_values(by="score", ascending=False)
video_df = score_df.loc[score_df["cid"].isin(video_id)].set_index(["cid"])
video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(video_id)].set_index(["cid"])
video_df["score"] = video_df["score"] + video_predict_df["score"] video_df["score"] = video_df["score"] + video_predict_df["score"]
video_df = video_df.reset_index().sort_values(by="score", ascending=False) video_df = video_df.sort_values(by="score", ascending=False)
not_video_id = not_video_df["cid"].values.tolist() not_video_id = not_video_df.index.tolist()
video_id = video_df["cid"].values.tolist() video_id = video_df.index.tolist()
diary_id = not_video_id diary_id = not_video_id
i = 1 i = 1
for j in video_id: for j in video_id:
...@@ -157,18 +160,19 @@ def update_dairy_queue(score_df,predict_score_df): ...@@ -157,18 +160,19 @@ def update_dairy_queue(score_df,predict_score_df):
# TODO 下面的3是测试用的,如果上线后,把3改成5 # TODO 下面的3是测试用的,如果上线后,把3改成5
i += 3 i += 3
print("分数合并成功")
return diary_id return diary_id
# 如果没有视频日记 # 如果没有视频日记
else: else:
score_df = score_df.reset_index(["cid"]) score_df = score_df.set_index(["cid"])
predict_score_df = predict_score_df.reset_index(["cid"]) predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"]=score_df["score"]+predict_score_df["score"] score_df["score"]=score_df["score"]+predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False) score_df = score_df.sort_values(by="score", ascending=False)
print("1分数合并成功")
return score_df["cid"].values.tolist() return score_df.index.tolist()
def update_sql_dairy_queue(queue_name, diary_id): def update_sql_dairy_queue(queue_name, diary_id,device_city):
db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work', db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='workwork', db='doris_test') passwd='workwork', db='doris_test')
cursor = db.cursor() cursor = db.cursor()
...@@ -178,7 +182,7 @@ def update_sql_dairy_queue(queue_name, diary_id): ...@@ -178,7 +182,7 @@ def update_sql_dairy_queue(queue_name, diary_id):
print("写入前") print("写入前")
print(id_str[:80]) print(id_str[:80])
sql = "update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\ sql = "update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\
(queue_name, diary_id, device_city[0],device_city[1]) (queue_name,id_str,device_city[0],device_city[1])
cursor.execute(sql) cursor.execute(sql)
db.commit() db.commit()
db.close() db.close()
...@@ -204,75 +208,54 @@ def get_native_queue(device_id,city_id): ...@@ -204,75 +208,54 @@ def get_native_queue(device_id,city_id):
return None return None
def multi_update(queue_name, name_dict, native_queue): def multi_update(queue_name,queue_arg,device_id,city_id):
if name_dict[queue_name] != []: if queue_arg[0] != []:
diary_id = predict(queue_name, name_dict) diary_id = predict(queue_name,queue_arg,device_id,city_id)
if get_native_queue(device_city[0], device_city[1]) == native_queue: update_sql_dairy_queue(queue_name, diary_id,device_id,city_id)
update_sql_dairy_queue(queue_name, diary_id)
print("更新结束") print("更新结束")
else: else:
print("不需要更新日记队列") print("预测集是空,不需要预测")
def get_queue(device_id, city_id): def get_queue(device_id, city_id,queue_name):
db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work', db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='workwork', db='doris_test') passwd='workwork', db='doris_test')
cursor = db.cursor() cursor = db.cursor()
sql = "select native_queue,nearby_queue,nation_queue,megacity_queue from device_diary_queue " \ sql = "select {} from device_diary_queue " \
"where device_id = '{}' and city_id = '{}';".format(device_id, city_id) "where device_id = '{}' and city_id = '{}';".format(queue_name,device_id, city_id)
cursor.execute(sql) cursor.execute(sql)
result = cursor.fetchall() result = cursor.fetchall()
df = pd.DataFrame(list(result)) df = pd.DataFrame(list(result))
if not df.empty: if not df.empty:
df = df.rename(columns={0: "native_queue", 1: "nearby_queue", 2: "nation_queue", 3: "megacity_queue"}) queue_list = df.loc[0,0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
native_queue = df.loc[0, "native_queue"].split(",")
native_queue = list(map(lambda x: "diary|" + str(x), native_queue))
nearby_queue = df.loc[0, "nearby_queue"].split(",")
nearby_queue = list(map(lambda x: "diary|" + str(x), nearby_queue))
nation_queue = df.loc[0, "nation_queue"].split(",")
nation_queue = list(map(lambda x: "diary|" + str(x), nation_queue))
megacity_queue = df.loc[0, "megacity_queue"].split(",")
megacity_queue = list(map(lambda x: "diary|" + str(x), megacity_queue))
db.close() db.close()
return True, native_queue, nearby_queue, nation_queue, megacity_queue return True, queue_list
else: else:
print("该用户对应的日记队列为空") print("该用户对应的日记队列为空")
return False, [], [], [], [] return False, []
def user_update(device_id,city_id): def user_update(device_id,city_id):
exist,native_queue, nearby_queue, nation_queue, megacity_queue = get_queue(device_id,city_id) queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
for queue_name in queue_name_list:
exist,queue_list = get_queue(device_id, city_id,queue_name)
# 下面的代码是用来对比native_queue是否发生变化,如果发生了变化,就不更新日记队列了
# if queue_name == "native_queue":
# native_queue_list =
if exist: if exist:
native_queue_predcit = list(set(native_queue) & set(data_set_cid)) queue_predcit = list(set(queue_list) & set(data_set_cid))
nearby_queue_predict = list(set(nearby_queue) & set(data_set_cid)) queue_not_predcit = list(set(queue_list) - set(data_set_cid))
nation_queue_predict = list(set(nation_queue) & set(data_set_cid)) queue_arg = [queue_predcit,queue_not_predcit,queue_list]
megacity_queue_predict = list(set(megacity_queue) & set(data_set_cid)) multi_update(queue_name,queue_arg,device_id,city_id)
native_queue_not_predcit = list(set(native_queue) - set(data_set_cid))
nearby_queue_not_predict = list(set(nearby_queue) - set(data_set_cid))
nation_queue_not_predict = list(set(nation_queue) - set(data_set_cid))
megacity_queue_not_predict = list(set(megacity_queue) - set(data_set_cid))
name_dict = {"native_queue":[native_queue_predcit,native_queue_not_predcit,native_queue],
"nearby_queue":[nearby_queue_predict,nearby_queue_not_predict,nearby_queue],
"nation_queue":[nation_queue_predict, nation_queue_not_predict,nation_queue],
"megacity_queue":[megacity_queue_predict,megacity_queue_not_predict,megacity_queue]}
#TODO 上线后把下面是数字1改成4
pool = Pool(1)
for queue_name in name_dict.keys():
pool.apply_async(multi_update, (queue_name, name_dict, native_queue,))
pool.close()
pool.join()
else: else:
print("日记队列为空") print("日记队列为空")
...@@ -306,8 +289,8 @@ if __name__ == "__main__": ...@@ -306,8 +289,8 @@ if __name__ == "__main__":
data_set_cid = pd.read_csv("/Users/mac/utils/data_set_cid.csv")["cid"].values.tolist() data_set_cid = pd.read_csv("/Users/mac/utils/data_set_cid.csv")["cid"].values.tolist()
device_city_list = [("356156075348110","tianjin")] device_city_list = [("356156075348110","tianjin")]
if device_city_list != []: if device_city_list != []:
for device_city in device_city_list: for i in device_city_list:
user_update(device_city[0], device_city[1]) user_update(i[0], i[1])
else: else:
print("该列表是新用户,不需要预测") print("该列表是新用户,不需要预测")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment