import pickle import xlearn as xl import pandas as pd import pymysql from datetime import datetime # utils 包必须要导,否则ffm转化时用到的pickle找不到utils,会报错 import utils import warnings from multiprocessing import Pool import json from sklearn.preprocessing import MinMaxScaler import time # from userProfile import get_active_users import os def get_video_id(): db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle') cursor = db.cursor() sql = "select diary_id from feed_diary_boost;" cursor.execute(sql) result = cursor.fetchall() df = pd.DataFrame(list(result)) video_id = df[0].values.tolist() print(video_id[:10]) db.close() return video_id # 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致 def feature_en(x_list, device_id): data = pd.DataFrame(x_list) # 下面的列名一定要用cid,不能用diaryid,因为预测模型用到的ffm上是cid data = data.rename(columns={0: "cid"}) data["device_id"] = device_id now = datetime.now() data["hour"] = now.hour data["minute"] = now.minute data.loc[data["hour"] == 0, ["hour"]] = 24 data.loc[data["minute"] == 0, ["minute"]] = 60 data["hour"] = data["hour"].astype("category") data["minute"] = data["minute"].astype("category") # 虽然预测y,但ffm转化需要y,并不影响预测结果 data["y"] = 0 # print("done 特征工程") return data # 把ffm.pkl load进来,将上面的表转化为ffm格式 def transform_ffm_format(df,queue_name,device_id): # with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f: with open("/Users/mac/utils/ffm.pkl", "rb") as f: ffm_format_pandas = pickle.load(f) data = ffm_format_pandas.native_transform(df) # predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name) predict_file_name = "/Users/mac/utils/result/{0}.csv".format(queue_name) data.to_csv(predict_file_name, index=False, header=None) print("done ffm") return predict_file_name # 将模型加载,预测 def predict(queue_name,queue_arg,device_id,city_id): data = feature_en(queue_arg[0], device_id) data_file_path = transform_ffm_format(data,queue_name) ffm_model = xl.create_ffm() ffm_model.setTest(data_file_path) ffm_model.setSigmoid() ffm_model.predict("/Users/mac/utils/model.out", "/Users/mac/utils/result/{0}_output.txt".format(queue_name)) # ffm_model.predict(DIRECTORY_PATH + "model.out", # DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name)) return save_result(queue_name,queue_arg,device_id) def save_result(queue_name,queue_arg,device_id): # score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name), header=None) score_df = pd.read_csv("/Users/mac/utils/result/{0}_output.txt".format(queue_name), header=None) # print(score_df) mm_scaler = MinMaxScaler() mm_scaler.fit(score_df) score_df = pd.DataFrame(mm_scaler.transform(score_df)) score_df = score_df.rename(columns={0: "score"}) score_df["cid"] = queue_arg[0] # 去掉cid前面的"diary|" score_df["cid"] = score_df["cid"].apply(lambda x:x[6:]) print("score_df:") print(score_df.head(1)) print(score_df.shape) if queue_arg[1] != []: df_temp = pd.DataFrame(queue_arg[1]).rename(columns={0: "cid"}) df_temp["score"] = 0 df_temp = df_temp.sort_index(axis=1,ascending=False) df_temp["cid"] = df_temp["cid"].apply(lambda x: x[6:]) print("temp_df:") print(df_temp.head(1)) print(df_temp.shape) predict_score_df = score_df.append(df_temp) print("score_df:") print(predict_score_df.head(1)) print(predict_score_df.shape) return merge_score(queue_name, queue_arg, predict_score_df) else: return merge_score(queue_name, queue_arg, score_df) def merge_score(queue_name, queue_arg, predict_score_df): db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work', passwd='workwork', db='zhengxing_test') cursor = db.cursor() # 去除diary_id 前面的"diary|" diary_list = tuple(list(map(lambda x:x[6:],queue_arg[2]))) print(diary_list) sql = "select score,diary_id from biz_feed_diary_score where diary_id in {};".format(diary_list) cursor.execute(sql) result = cursor.fetchall() score_df = pd.DataFrame(list(result)).rename(columns = {0:"score",1:"cid"}) print("日记打分表") print(score_df.head(2)) db.close() return update_dairy_queue(score_df,predict_score_df) def update_dairy_queue(score_df,predict_score_df): diary_id = score_df["cid"].values.tolist() video_id = [] x = 1 while x < len(diary_id): video_id.append(diary_id[x]) x += 5 if len(video_id)>0: not_video = list(set(diary_id) - set(video_id)) # 为了相加时,cid能够匹配,先把cid变成索引,相加后,再把cid恢复成列 not_video_df = score_df.loc[score_df["cid"].isin(not_video)].set_index(["cid"]) not_video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(not_video)].set_index(["cid"]) not_video_df["score"] = not_video_df["score"] + not_video_predict_df["score"] not_video_df = not_video_df.sort_values(by="score", ascending=False) video_df = score_df.loc[score_df["cid"].isin(video_id)].set_index(["cid"]) video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(video_id)].set_index(["cid"]) video_df["score"] = video_df["score"] + video_predict_df["score"] video_df = video_df.sort_values(by="score", ascending=False) not_video_id = not_video_df.index.tolist() video_id = video_df.index.tolist() diary_id = not_video_id i = 1 for j in video_id: diary_id.insert(i, j) # TODO 下面的3是测试用的,如果上线后,把3改成5 i += 3 print("分数合并成功") return diary_id # 如果没有视频日记 else: score_df = score_df.set_index(["cid"]) predict_score_df = predict_score_df.set_index(["cid"]) score_df["score"]=score_df["score"]+predict_score_df["score"] score_df = score_df.sort_values(by="score", ascending=False) print("1分数合并成功") return score_df.index.tolist() def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id): db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work', passwd='workwork', db='doris_test') cursor = db.cursor() id_str = str(diary_id[0]) for i in range(1, len(diary_id)): id_str = id_str + "," + str(diary_id[i]) print("写入前") print(id_str[:80]) sql = "update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\ (queue_name,id_str,device_id, city_id) cursor.execute(sql) db.commit() db.close() print("成功写入diaryid") # 更新前获取最新的native_queue def get_native_queue(device_id,city_id): db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work', passwd='workwork', db='doris_test') cursor = db.cursor() sql = "select native_queue from device_diary_queue where device_id = '{}' and city_id = '{}';".format(device_id,city_id) cursor.execute(sql) result = cursor.fetchall() df = pd.DataFrame(list(result)) if not df.empty: native_queue = df.loc[0,0].split(",") native_queue = list(map(lambda x:"diary|"+str(x),native_queue)) db.close() # print("成功获取native_queue") return native_queue else: return None def multi_update(queue_name,queue_arg,device_id,city_id): if queue_arg[0] != []: diary_id = predict(queue_name,queue_arg,device_id,city_id) return diary_id else: print("预测集是空,不需要预测") return False def get_queue(device_id, city_id,queue_name): db = pymysql.connect(host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work', passwd='workwork', db='doris_test') cursor = db.cursor() sql = "select {} from device_diary_queue " \ "where device_id = '{}' and city_id = '{}';".format(queue_name,device_id, city_id) cursor.execute(sql) result = cursor.fetchall() df = pd.DataFrame(list(result)) if not df.empty: queue_list = df.loc[0,0].split(",") queue_list = list(map(lambda x: "diary|" + str(x), queue_list)) db.close() return queue_list else: print("该用户对应的日记队列为空") return False def user_update(device_id,city_id): global native_queue_list queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"] for queue_name in queue_name_list: queue_list = get_queue(device_id, city_id,queue_name) if queue_name == "native_queue": native_queue_list = queue_list if queue_list: queue_predict = list(set(queue_list) & set(data_set_cid)) queue_not_predict = list(set(queue_list) - set(data_set_cid)) queue_arg = [queue_predict,queue_not_predict,queue_list] diary_id = multi_update(queue_name, queue_arg, device_id, city_id) if diary_id and (native_queue_list == get_native_queue(device_id,city_id)): update_sql_dairy_queue(queue_name, diary_id, device_id, city_id) print("更新结束") else: print("不需要更新日记队列") else: print("日记队列为空") if __name__ == "__main__": # while True: # TODO 部署到线上,改一下get_active_users,现在不返回cityid,改成city_id和deviceid 组成的df # empty,df = get_active_users() # if empty: # for eachFile in os.listdir("/tmp"): # if "xlearn" in eachFile: # os.remove("/tmp" + "/" + eachFile) # time.sleep(58) # else: # old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist() # device_id_list = df["device_id"].values.tolist() # # 求活跃用户和老用户的交集,也就是只预测老用户 # predict_list = list(set(device_id_list) & set(old_device_id_list)) # # # 只预测尾号是6的ID,这块也可以在数据库取数据时过滤一下 # # predict_list = list(filter(lambda x:str(x)[-1] == "6", predict_list)) # df = df.loc[df["device_id"].isin(predict_list)] # device_list = df["device_id"].values.tolist() # city_list = df["city_id"].values.tolist() # device_city_list = list(zip(device_list,city_list)) # start = time.time() # 测试改生产改一下模型、pickle、输出文件路径、读取文件路径 warnings.filterwarnings("ignore") # data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist() data_set_cid = pd.read_csv("/Users/mac/utils/data_set_cid.csv")["cid"].values.tolist() device_city_list = [("356156075348110","tianjin")] if device_city_list != []: for i in device_city_list: user_update(i[0], i[1]) else: print("该列表是新用户,不需要预测") end = time.time() # # TODO 上线后把预测用户改成多进程预测