1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from config import *
import pandas as pd
import pickle
import xlearn as xl
from userProfile import *
import time
from utils import *
import os
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def feature_en(user_profile):
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(user_profile['city_id'])
data = pd.read_csv(file_name)
data["device_id"] = user_profile['device_id']
now = datetime.now()
data["hour"] = now.hour
data["minute"] = now.minute
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
data = data.drop("city_id", axis=1)
return data
# 把ffm.pkl load进来,将上面的表转化为ffm格式
def transform_ffm_format(df, device_id):
with open(DIRECTORY_PATH+"ffm.pkl","rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.transform(df)
now = datetime.now().strftime("%Y-%m-%d-%H-%M")
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}DiaryTop3000.csv".format(device_id, now)
data.to_csv(predict_file_name, index=False,header=None)
print("成功将ffm预测文件写到服务器")
return predict_file_name
# 将模型加载,预测,把预测日记的概率值按照降序排序,存到一个表里
def predict(user_profile):
instance = feature_en(user_profile)
instance_file_path = transform_ffm_format(instance, user_profile["device_id"])
ffm_model = xl.create_ffm()
ffm_model.setTest(instance_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
print("该用户预测结束")
predict_save_to_local(user_profile, instance)
#TODO 没有提供生产环境的redis地址,所以这个函数先不运行
# predict_save_to_redis(user_profile, instance)
# 将预测结果与device_id 进行拼接,并按照概率降序排序
def wrapper_result(user_profile, instance):
proba = pd.read_csv(DIRECTORY_PATH +
"result/{0}_output.txt".format(user_profile['device_id']), header=None)
proba = proba.rename(columns={0: "prob"})
proba["cid"] = instance['cid']
proba = proba.sort_values(by="prob", ascending=False)
proba = proba.head(50)
return proba
# 预测候选集保存到本地
def predict_save_to_local(user_profile, instance):
proba = wrapper_result(user_profile, instance)
proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False)
print("成功将预测候选集保存到本地")
# 预测候选集保存到redis
def predict_save_to_redis(user_profile, instance):
device_id = user_profile['device_id']
cid_list = wrapper_result(user_profile, instance)["cid"].values.tolist()
add_data_to_redis(device_id,cid_list)
print("成功将预测候选集保存到redis")
def router(device_id):
user_profile, not_exist = fetch_user_profile(device_id)
if not_exist:
print('Sorry, we don\'t have you.')
else:
predict(user_profile)
# 多进程预测
def multi_predict(predict_list,processes=12):
pool = Pool(processes)
for device_id in predict_list:
start = time.time()
pool.apply_async(router, (device_id,))
end = time.time()
print("该用户{}预测耗时{}秒".format(device_id, (end - start)))
pool.close()
pool.join()
if __name__ == "__main__":
# TODO 如果耗时小于一分钟,下一次取到的device_id和上一次相同。还有一种情况,一个用户持续活跃,会被重复预测
while True:
empty,device_id_list = get_active_users()
if empty:
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
time.sleep(58)
else:
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
predict_list = list(set(device_id_list) & set(old_device_id_list))
multi_predict(predict_list)
#TODO 上线前把预测流程中的计时器、打印代码删掉或者注释,因为预测对性能要求高,能少一条代码语句就少一条