1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import pymysql
import pandas as pd
from datetime import datetime
import time
import pickle
import xlearn as xl
DIRECTORY_PATH = '/data2/models/'
VALIDATION_DATE = '2018-08-05'
TEST_DATE = '2018-08-06'
DATA_START_DATE = '2018-07-05'
DATA_END_DATE = '2018-08-06'
MODEL_VERSION = ''
lr = 0.03
l2_lambda = 0.002
class FFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
last_idx = 0
for col in df.columns:
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None):
self.fit(df, y)
return self.transform(df)
def transform_row_(self, row, t):
ffm = []
if self.y is not None:
ffm.append(str(row.loc[row.index == self.y][0]))
if self.y is None:
ffm.append(str(0))
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
return ' '.join(ffm)
def transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
def con_sql(sql):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
print("成功从数据库获取数据")
df = pd.DataFrame(list(result)).dropna()
print("数据转化df成功")
db.close()
return df
# 获取当下一分钟内活跃用户
def get_active_users():
now = datetime.now()
now_start = str(now)[:16] + ":00"
now_end = str(now)[:16] + ":59"
sql = "select device_id from user_active_time " \
"where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start)
device_id_df = con_sql(sql)
if device_id_df.empty:
print("当下这一分钟没有活跃用户,不需要预测")
return True,None
else:
device_id_list = device_id_df[0].values.tolist()
# 对device_id 进行去重
device_id_list = list(set(device_id_list))
print("成功获取当下一分钟内活跃用户")
return False,device_id_list
def fetch_user_profile(device_id):
sql = "select device_id,city_id from data_feed_click where device_id = '{0}' limit 1".format(device_id)
user_profile = con_sql(sql)
if user_profile.empty:
print("没有获取到该用户对应的city_id")
# 为了debug supervisor,修改了下面的return参数
return {1: 2}, 1
else:
user_profile = user_profile.rename(columns={0: "device_id", 1: "city_id"})
print("成功获取该用户对应的city_id")
user_profile_dict = {}
for i in user_profile.columns:
user_profile_dict[i] = user_profile.loc[0, i]
# 为了debug supervisor,修改了下面的return参数
return user_profile_dict, "0"
def feature_en(user_profile):
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(user_profile['city_id'])
data = pd.read_csv(file_name)
data["device_id"] = user_profile['device_id']
now = datetime.now()
data["hour"] = now.hour
data["minute"] = now.minute
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
data = data.drop("city_id", axis=1)
print(data.head(1))
print("特征工程处理结束")
return data
def transform_ffm_format(df, device_id):
file_path = DIRECTORY_PATH + "ffm_{0}_{1}.pkl".format(DATA_START_DATE, DATA_END_DATE)
with open(file_path, "rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.transform(df)
now = datetime.now().strftime("%Y-%m-%d-%H-%M")
print("ffm格式转化结束")
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}DiaryTop3000.csv".format(device_id, now)
data.to_csv(predict_file_name, index=False,header=None)
print("ffm写到服务器")
return predict_file_name
def wrapper_result(user_profile, instance):
proba = pd.read_csv(DIRECTORY_PATH +
"result/{0}_output.txt".format(user_profile['device_id']), header=None)
proba = proba.rename(columns={0: "prob"})
proba["cid"] = instance['cid']
proba = proba.sort_values(by="prob", ascending=False)
proba = proba.head(50)
return proba
def predict_save_to_local(user_profile, instance):
proba = wrapper_result(user_profile, instance)
proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False)
print("成功将预测候选集保存到本地")
def predict(user_profile):
instance = feature_en(user_profile)
instance_file_path = transform_ffm_format(instance, user_profile["device_id"])
ffm_model = xl.create_ffm()
ffm_model.setTest(instance_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model_{0}-{1}_lr{2}_lambda{3}.out".format(DATA_START_DATE,
DATA_END_DATE, lr, l2_lambda),
DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
print("预测结束")
# predict_save_to_local(user_profile, instance)
def router(device_id):
user_profile, not_exist = fetch_user_profile(device_id)
if not_exist==1:
print('Sorry, we don\'t have you.')
else:
predict(user_profile)
if __name__ == "__main__":
sql = "delete from data_feed_click where stat_date = '2018-10-17'"
while True:
start = time.time()
empty,device_id_list = get_active_users()
if empty==True:
time.sleep(30)
else:
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
for device_id in device_id_list:
if device_id in old_device_id_list:
# router(device_id)
user_profile, not_exist = fetch_user_profile(device_id)
if not_exist == 1:
print('Sorry, we don\'t have you.')
else:
predict(user_profile)
else:
print("该用户不是老用户,不能预测")
# end = time.time()
# time_cost = (end - start)
# print("耗时{}秒".format(time_cost))