Commit 9caf9b4e authored by 张彦钊's avatar 张彦钊

新增实时预测、预测结果保存到redis

parent 3fcaa79f
......@@ -2,8 +2,9 @@ from config import *
import pandas as pd
import pickle
import xlearn as xl
import datetime
from userProfile import fetch_user_profile
from userProfile import *
import time
from utils import *
# 接收device_id、city_id
......@@ -47,34 +48,39 @@ def transform_ffm_format(df, device_id):
# 将模型加载,预测,把预测日记的概率值按照降序排序,存到一个表里
def predict(user_profile):
user_instance = feature_en(user_profile)
user_instance_file_path = transform_ffm_format(user_instance, user_profile["device_id"])
instance = feature_en(user_profile)
instance_file_path = transform_ffm_format(instance, user_profile["device_id"])
ffm_model = xl.create_ffm()
ffm_model.setTest(user_instance_file_path)
ffm_model.setTest(instance_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model_{0}-{1}_lr{2}_lambda{3}.out".format(DATA_START_DATE,
DATA_END_DATE, lr, l2_lambda),
DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
predict_save_to_local(user_profile, instance)
predict_save_to_redis(user_profile, instance)
upload_predict(user_profile, user_instance)
def upload_predict(user_profile, instance):
probabilities = pd.read_csv(DIRECTORY_PATH +
# 将预测结果与device_id 进行拼接,并按照概率降序排序
def wrapper_result(user_profile, instance):
proba = pd.read_csv(DIRECTORY_PATH +
"result/{0}_output.txt".format(user_profile['device_id']), header=None)
probabilities = probabilities.rename(columns={0: "prob"})
probabilities["cid"] = instance['cid']
probabilities = probabilities.sort_values(by="prob", ascending=False)
wrapper_result(probabilities, user_profile['device_id'])
def wrapper_result(prob, device_id):
prob = prob.head(500)
prob.loc[:,"url"] = prob["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
prob.to_csv(DIRECTORY_PATH + "result/feed_{}".format(device_id),index= False)
proba = proba.rename(columns={0: "prob"})
proba["cid"] = instance['cid']
proba = proba.sort_values(by="prob", ascending=False)
proba = proba.head(50)
return proba
# 预测候选集保存到本地
def predict_save_to_local(user_profile, instance):
proba = wrapper_result(user_profile, instance)
proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False)
# TODO 写到redis
def predict_save_to_redis(user_profile, instance):
device_id = user_profile['device_id']
cid_list = wrapper_result(user_profile, instance)["cid"].values.tolist()
add_data_to_redis(device_id,cid_list)
def router(device_id):
user_profile, is_exist = fetch_user_profile(device_id)
......@@ -86,4 +92,19 @@ def router(device_id):
if __name__ == "__main__":
router(device_id='EDEE004A-40AC-4ACD-9D3F-D54480EACEE9')
# TODO 如果耗时小于一分钟,下一次取到的device_id和上一次相同
while True:
start = time.time()
empty,device_id_list = get_active_users()
if empty:
print("当前没有活跃用户,不需要预测")
time.sleep(60)
else:
for device_id in device_id_list:
router(device_id)
end = time.time()
# 将秒转化为毫秒
time_cost = (end - start)*1000
print("预测耗时{}毫秒".format(time_cost))
......@@ -17,6 +17,8 @@ pip install pandas
问题:
#安装pandas出错:Python.h: No such file or directory
#解决方案:sudo apt-get install python3-dev
#如果安装包失败,让高雅喆用他的账户在服务器上进行安装。因为女娲项目是他在服务器上的"/home/gaoyazhe"路径下创建的,
# 所以别的账户安装包可能会失败。
pip install scipy
pip install scikit-learn
pip uninstall numpy
......
from processData import *
from diaryTraining import *
from diaryCandidateSet import get_eachCityDiaryTop2000
from diaryCandidateSet import get_eachCityDiaryTop3000
# 把数据获取、特征转换、模型训练的模型串联在一起
......@@ -9,5 +9,5 @@ if __name__ == "__main__":
ffm_transform(data_fe)
train()
print('---------------prepare candidates--------------')
get_eachCityDiaryTop2000()
get_eachCityDiaryTop3000()
from utils import con_sql
from datetime import datetime
# 获取当下一分钟内活跃用户
def get_active_users():
now = datetime.now()
now_start = str(now)[:16] + ":00"
now_end = str(now)[:16] + ":59"
sql = "select device_id from user_active_time " \
"where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start)
device_id_df = con_sql(sql)
device_id_list = device_id_df[0].values.tolist()
# 对device_id 进行去重
device_id_list = list(set(device_id_list))
if device_id_df.empty:
return True,None
else:
return False,device_id_list
def fetch_user_profile(device_id):
# TODO sql语句中的device_id可能对应多个city_id
sql = "select device_id,city_id from " \
"data_feed_click where device_id = '{0}' limit 1".format(device_id)
user_profile = con_sql(sql)
if user_profile.empty:
return {}, user_profile.empty
sql = "select device_id,city_id from data_feed_click where device_id = '{0}' limit 1".format(device_id)
user_profile = con_sql(sql)
if user_profile.empty:
return {}, user_profile.empty
user_profile = user_profile.rename(columns={0:"device_id",1:"city_id"})
user_profile_dict = {}
for i in user_profile.columns:
user_profile_dict[i] = user_profile.loc[0, i]
return user_profile_dict, not user_profile.empty
user_profile = user_profile.rename(columns={0:"device_id",1:"city_id"})
user_profile_dict = {}
for i in user_profile.columns:
user_profile_dict[i] = user_profile.loc[0, i]
return user_profile_dict, not user_profile.empty
......@@ -2,9 +2,10 @@
import pymysql
import pandas as pd
import numpy as np
import redis
# 从数据库的表里获取数据,并转化成df格式
# 从Tidb数据库的表里获取数据,并转化成df格式
def con_sql(sql):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
......@@ -15,6 +16,25 @@ def con_sql(sql):
return df
class Database:
def __init__(self):
self.host = '10.30.50.58'
self.port = 6379
self.write_pool = {}
def add_write(self, key,val):
self.write_pool[key] = val
def batch_write(self):
r = redis.StrictRedis(host=self.host, port=self.port)
r.mset(self.write_pool)
# 把数据写到redis里
def add_data_to_redis(key,val):
db = Database()
db.add_write(key,val)
db.batch_write()
# ffm 格式转换函数、类
class FFMFormatPandas:
def __init__(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment