import pickle import xlearn as xl import pandas as pd import pymysql from datetime import datetime # utils 包必须要导,否则ffm转化时用到的pickle找不到utils,会报错 import utils import warnings from multiprocessing import Pool from config import * import time import json def test_con_sql(device_id): db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris', passwd='o5gbA27hXHHm', db='doris_prod') cursor = db.cursor() sql = "select native_queue,nearby_queue,nation_queue,megacity_queue from device_diary_queue " \ "where device_id = '{}';".format(device_id) cursor.execute(sql) result = cursor.fetchall() df = pd.DataFrame(list(result)) if not df.empty: df = df.rename(columns={0: "native_queue", 1: "nearby_queue", 2: "nation_queue", 3: "megacity_queue"}) native_queue = df.loc[0, "native_queue"].split(",") native_queue = list(map(lambda x:"diary|"+str(x),native_queue)) native_queue = list(set(native_queue)&set(data_set_cid)) nearby_queue = df.loc[0, "nearby_queue"].split(",") nearby_queue = list(map(lambda x: "diary|" + str(x), nearby_queue)) nearby_queue = list(set(nearby_queue)&set(data_set_cid)) nation_queue = df.loc[0, "nation_queue"].split(",") nation_queue = list(map(lambda x: "diary|" + str(x), nation_queue)) nation_queue = list(set(nation_queue)&set(data_set_cid)) megacity_queue = df.loc[0, "megacity_queue"].split(",") megacity_queue = list(map(lambda x: "diary|" + str(x), megacity_queue)) megacity_queue = list(set(megacity_queue)&set(data_set_cid)) db.close() print("成功获取日记队列") return native_queue, nearby_queue, nation_queue, megacity_queue else: print("该用户对应的日记队列为空") # 更新前获取最新的native_queue def get_native_queue(device_id): db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris', passwd='o5gbA27hXHHm', db='doris_prod') cursor = db.cursor() sql = "select native_queue from device_diary_queue where device_id = '{}';".format(device_id) cursor.execute(sql) result = cursor.fetchall() df = pd.DataFrame(list(result)) if not df.empty: native_queue = df.loc[0,0].split(",") native_queue = list(map(lambda x:"diary|"+str(x),native_queue)) native_queue = list(set(native_queue) & set(data_set_cid)) db.close() print("成功获取native_queue") return native_queue else: return None # 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致 def feature_en(x_list, device_id): data = pd.DataFrame(x_list) # 下面的列名一定要用cid,不能用diaryid,因为预测模型用到的ffm上是cid data = data.rename(columns={0: "cid"}) data["device_id"] = device_id now = datetime.now() data["hour"] = now.hour data["minute"] = now.minute data.loc[data["hour"] == 0, ["hour"]] = 24 data.loc[data["minute"] == 0, ["minute"]] = 60 data["hour"] = data["hour"].astype("category") data["minute"] = data["minute"].astype("category") # 虽然预测y,但ffm转化需要y,并不影响预测结果 data["y"] = 0 print("done 特征工程") return data # 把ffm.pkl load进来,将上面的表转化为ffm格式 def transform_ffm_format(df,queue_name): with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f: ffm_format_pandas = pickle.load(f) data = ffm_format_pandas.transform(df) predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name) data.to_csv(predict_file_name, index=False, header=None) print("done ffm") return predict_file_name # 将模型加载,预测 def predict(queue_name, x_list): data = feature_en(x_list,device_id) data_file_path = transform_ffm_format(data,queue_name) ffm_model = xl.create_ffm() ffm_model.setTest(data_file_path) ffm_model.setSigmoid() ffm_model.predict(DIRECTORY_PATH + "model.out", DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name)) print("done predict") def save_result(queue_name, x_list): score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id,queue_name), header=None) score_df = score_df.rename(columns={0: "score"}) score_df["cid"] = x_list score_df = score_df.sort_values(by="score", ascending=False) print("概率前十行:") print(score_df) return score_df def merge_score(x_list, score_df): db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle') cursor = db.cursor() # 去除diary_id 前面的"diary|" x_list = tuple(list(map(lambda x:x[6:],x_list))) # TODO 把id也取下来,这样可以解决分数不匹配的问题 sql = "select score from biz_feed_diary_score where diary_id in {};".format(x_list) cursor.execute(sql) result = cursor.fetchall() score = pd.DataFrame(list(result)) print("数据库日记表前十行") print(score) score_list = score[0].values.tolist() db.close() score_df["score"] = score_df["score"] + score_list return score_df def update_dairy_queue(score_df): diary_id = score_df["cid"].values.tolist() video_id = [] x = 1 while x < len(diary_id): video_id.append(diary_id[x]) x += 5 if len(video_id)>0: not_video_id = list(set(diary_id) - set(video_id)) not_video_id_df = score_df.loc[score_df["cid"].isin(not_video_id)] not_video_id_df = not_video_id_df.sort_values(by="score", ascending=False) video_id_df = score_df.loc[score_df["cid"].isin(video_id)] video_id_df = video_id_df.sort_values(by="score", ascending=False) not_video_id = not_video_id_df["cid"].values.tolist() video_id = video_id_df["cid"].values.tolist() diary_id = not_video_id i = 1 for j in video_id: diary_id.insert(i, j) i += 5 return diary_id else: score_df = score_df.sort_values(by="score", ascending=False) return score_df["cid"].values.tolist() def update_sql_dairy_queue(queue_name, diary_id): db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris', passwd='o5gbA27hXHHm', db='doris_prod') cursor = db.cursor() ## 去除diary_id 前面的"diary|" diary_id = json.dumps(list(map(lambda x:x[6:],diary_id))) sql = "update device_diary_queue set {}='{}' where device_id = '{}'".format(queue_name, diary_id, device_id) cursor.execute(sql) db.close() print("成功写入") def multi_update(key, name_dict): predict(key, name_dict[key]) score_df = save_result(key, name_dict[key]) score_df = merge_score(name_dict[key], score_df) diary_id = update_dairy_queue(score_df) if get_native_queue(device_id) == native_queue_list: update_sql_dairy_queue(key, diary_id) print("更新结束") else: print("不需要更新日记队列") if __name__ == "__main__": warnings.filterwarnings("ignore") # TODO 上线后把预测用户改成多进程预测 data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist() device_id = "358035085192742" native_queue_list, nearby_queue_list, nation_queue_list, megacity_queue_list = test_con_sql(device_id) name_dict = {"native_queue": native_queue_list, "nearby_queue": nearby_queue_list, "nation_queue": nation_queue_list, "megacity_queue": megacity_queue_list} for key in name_dict.keys(): multi_update(key, name_dict) # pool = Pool(4) # for key in name_dict.keys(): # pool.apply_async(multi_update,(key,name_dict,)) # pool.close() # pool.join()