import pymysql import pandas as pd from datetime import datetime import time import pickle import xlearn as xl DIRECTORY_PATH = '/data2/models/' VALIDATION_DATE = '2018-08-05' TEST_DATE = '2018-08-06' DATA_START_DATE = '2018-07-05' DATA_END_DATE = '2018-08-06' MODEL_VERSION = '' lr = 0.03 l2_lambda = 0.002 class FFMFormatPandas: def __init__(self): self.field_index_ = None self.feature_index_ = None self.y = None def fit(self, df, y=None): self.y = y df_ffm = df[df.columns.difference([self.y])] if self.field_index_ is None: self.field_index_ = {col: i for i, col in enumerate(df_ffm)} if self.feature_index_ is not None: last_idx = max(list(self.feature_index_.values())) if self.feature_index_ is None: self.feature_index_ = dict() last_idx = 0 for col in df.columns: vals = df[col].unique() for val in vals: if pd.isnull(val): continue name = '{}_{}'.format(col, val) if name not in self.feature_index_: self.feature_index_[name] = last_idx last_idx += 1 self.feature_index_[col] = last_idx last_idx += 1 return self def fit_transform(self, df, y=None): self.fit(df, y) return self.transform(df) def transform_row_(self, row, t): ffm = [] if self.y is not None: ffm.append(str(row.loc[row.index == self.y][0])) if self.y is None: ffm.append(str(0)) for col, val in row.loc[row.index != self.y].to_dict().items(): col_type = t[col] name = '{}_{}'.format(col, val) if col_type.kind == 'O': ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name])) elif col_type.kind == 'i': ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val)) return ' '.join(ffm) def transform(self, df): t = df.dtypes.to_dict() return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()}) # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在 def is_feature_index_exist(self, name): if name in self.feature_index_: return True else: return False def con_sql(sql): db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() print("成功从数据库获取数据") df = pd.DataFrame(list(result)).dropna() print("数据转化df成功") db.close() return df # 获取当下一分钟内活跃用户 def get_active_users(): now = datetime.now() now_start = str(now)[:16] + ":00" now_end = str(now)[:16] + ":59" sql = "select device_id from user_active_time " \ "where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start) device_id_df = con_sql(sql) if device_id_df.empty: print("当下这一分钟没有活跃用户,不需要预测") return True,None else: device_id_list = device_id_df[0].values.tolist() # 对device_id 进行去重 device_id_list = list(set(device_id_list)) print("成功获取当下一分钟内活跃用户") return False,device_id_list def fetch_user_profile(device_id): sql = "select device_id,city_id from data_feed_click where device_id = '{0}' limit 1".format(device_id) user_profile = con_sql(sql) if user_profile.empty: print("没有获取到该用户对应的city_id") # 为了debug supervisor,修改了下面的return参数 return {1: 2}, 1 else: user_profile = user_profile.rename(columns={0: "device_id", 1: "city_id"}) print("成功获取该用户对应的city_id") user_profile_dict = {} for i in user_profile.columns: user_profile_dict[i] = user_profile.loc[0, i] # 为了debug supervisor,修改了下面的return参数 return user_profile_dict, "0" def feature_en(user_profile): file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(user_profile['city_id']) data = pd.read_csv(file_name) data["device_id"] = user_profile['device_id'] now = datetime.now() data["hour"] = now.hour data["minute"] = now.minute data.loc[data["hour"] == 0, ["hour"]] = 24 data.loc[data["minute"] == 0, ["minute"]] = 60 data["hour"] = data["hour"].astype("category") data["minute"] = data["minute"].astype("category") # 虽然预测y,但ffm转化需要y,并不影响预测结果 data["y"] = 0 data = data.drop("city_id", axis=1) print(data.head(1)) print("特征工程处理结束") return data def transform_ffm_format(df, device_id): file_path = DIRECTORY_PATH + "ffm_{0}_{1}.pkl".format(DATA_START_DATE, DATA_END_DATE) with open(file_path, "rb") as f: ffm_format_pandas = pickle.load(f) data = ffm_format_pandas.transform(df) now = datetime.now().strftime("%Y-%m-%d-%H-%M") print("ffm格式转化结束") predict_file_name = DIRECTORY_PATH + "result/{0}_{1}DiaryTop3000.csv".format(device_id, now) data.to_csv(predict_file_name, index=False,header=None) print("ffm写到服务器") return predict_file_name def wrapper_result(user_profile, instance): proba = pd.read_csv(DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']), header=None) proba = proba.rename(columns={0: "prob"}) proba["cid"] = instance['cid'] proba = proba.sort_values(by="prob", ascending=False) proba = proba.head(50) return proba def predict_save_to_local(user_profile, instance): proba = wrapper_result(user_profile, instance) proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/') proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False) print("成功将预测候选集保存到本地") def predict(user_profile): instance = feature_en(user_profile) instance_file_path = transform_ffm_format(instance, user_profile["device_id"]) ffm_model = xl.create_ffm() ffm_model.setTest(instance_file_path) ffm_model.setSigmoid() ffm_model.predict(DIRECTORY_PATH + "model_{0}-{1}_lr{2}_lambda{3}.out".format(DATA_START_DATE, DATA_END_DATE, lr, l2_lambda), DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id'])) print("预测结束") # predict_save_to_local(user_profile, instance) def router(device_id): user_profile, not_exist = fetch_user_profile(device_id) if not_exist==1: print('Sorry, we don\'t have you.') else: predict(user_profile) if __name__ == "__main__": sql = "delete from data_feed_click where stat_date = '2018-10-17'" while True: start = time.time() empty,device_id_list = get_active_users() if empty==True: time.sleep(30) else: old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist() for device_id in device_id_list: if device_id in old_device_id_list: # router(device_id) user_profile, not_exist = fetch_user_profile(device_id) if not_exist == 1: print('Sorry, we don\'t have you.') else: predict(user_profile) else: print("该用户不是老用户,不能预测") # end = time.time() # time_cost = (end - start) # print("耗时{}秒".format(time_cost))