Commit cb34d9d3 authored by 张彦钊's avatar 张彦钊

change ffm process

parent c5bcb8e4
DIRECTORY_PATH = '/data2/models/'
# 测试日期一定要大于验证日期,因为切割数据集的代码是这样设置的
# VALIDATION_DATE = '2018-08-05'
......@@ -13,5 +11,33 @@ MODEL_VERSION = ''
lr = 0.03
l2_lambda = 0.002
# processData.py
# diaryTraining.py
#线上日记视频对应的ip
ONLINE_EAGLE_HOST = '10.66.157.22'
# 测试日记视频所在的ip
LOCAL_EAGLE_HOST = "192.168.15.12"
# 本地地址
LOCAL_DIRCTORY = "/Users/mac/utils/"
# # 线下pkl
# "/Users/mac/utils/ffm.pkl"
# #线下预测文件
# "/Users/mac/utils/result/{0}.csv".format(queue_name)
# # 线下模型、预测产出文件
# "/Users/mac/utils/model.out",
# "/Users/mac/utils/result/{0}_output.txt".format(queue_name)
#
# # 线下日记队列
# host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
# passwd='workwork', db='doris_test'
# select native_queue from device_diary_queue where device_id = '{}' and city_id = '{}';".for
# update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\
# (queue_name,id_str,device_id, city_id)
#
# # 线下日记打分表
# host='rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com', port=3306, user='work',
# passwd='workwork', db='zhengxing_test'
# "select score,diary_id from biz_feed_diary_score where diary_id in {};".format(diary_list)
#!/srv/envs/nvwa/bin/python
# -*- coding: utf-8 -*-
import pickle
import xlearn as xl
import pandas as pd
import pymysql
from datetime import datetime
# utils 包必须要导,否则ffm转化时用到的pickle找不到utils,会报错
import utils
import warnings
from multiprocessing import Pool
from userProfile import get_active_users
from sklearn.preprocessing import MinMaxScaler
import time
from config import *
import socket
def get_video_id(cache_video_id):
if flag:
db = pymysql.connect(host=ONLINE_EAGLE_HOST, port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
else:
# 本地数据库,没有密码,可能报错
db = pymysql.connect(host=LOCAL_EAGLE_HOST, port=4000, user='root', db='eagle')
cursor = db.cursor()
sql = "select diary_id from feed_diary_boost;"
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
print("videio_id 预览")
print(df.head(1))
db.close()
if df.empty:
return cache_video_id
else:
video_id = df[0].values.tolist()
return video_id
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def feature_en(x_list, device_id):
data = pd.DataFrame(x_list)
# 下面的列名一定要用cid,不能用diaryid,因为预测模型用到的ffm上是cid
data = data.rename(columns={0: "cid"})
data["device_id"] = device_id
now = datetime.now()
data["hour"] = now.hour
data["minute"] = now.minute
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
# print("done 特征工程")
return data
# 把ffm.pkl load进来,将上面的数据转化为ffm格式
def transform_ffm_format(df,queue_name,device_id):
with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.native_transform(df)
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name)
data.to_csv(predict_file_name, index=False, header=None)
# print("done ffm")
return predict_file_name
def predict(queue_name,queue_arg,device_id):
data = feature_en(queue_arg[0], device_id)
data_file_path = transform_ffm_format(data,queue_name,device_id)
ffm_model = xl.create_ffm()
ffm_model.setTest(data_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name))
def save_result(queue_name,queue_arg,device_id):
score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name), header=None)
mm_scaler = MinMaxScaler()
mm_scaler.fit(score_df)
score_df = pd.DataFrame(mm_scaler.transform(score_df))
score_df = score_df.rename(columns={0: "score"})
score_df["cid"] = queue_arg[0]
# 去掉cid前面的"diary|"
score_df["cid"] = score_df["cid"].apply(lambda x:x[6:])
# print("score_df:")
# print(score_df.head(1))
# print(score_df.shape)
if queue_arg[1] != []:
df_temp = pd.DataFrame(queue_arg[1]).rename(columns={0: "cid"})
df_temp["score"] = 0
df_temp = df_temp.sort_index(axis=1,ascending=False)
df_temp["cid"] = df_temp["cid"].apply(lambda x: x[6:])
predict_score_df = score_df.append(df_temp)
return predict_score_df
else:
return score_df
def get_score(queue_arg):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle')
cursor = db.cursor()
# 去除diary_id 前面的"diary|"
diary_list = tuple(list(map(lambda x:x[6:],queue_arg[2])))
sql = "select score,diary_id from biz_feed_diary_score where diary_id in {};".format(diary_list)
cursor.execute(sql)
result = cursor.fetchall()
score_df = pd.DataFrame(list(result)).dropna()
db.close()
return score_df
def update_dairy_queue(score_df,predict_score_df,total_video_id):
diary_id = score_df["cid"].values.tolist()
if total_video_id != []:
video_id = list(set(diary_id)&set(total_video_id))
if len(video_id)>0:
not_video = list(set(diary_id) - set(video_id))
# 为了相加时cid能够匹配,先把cid变成索引
not_video_df = score_df.loc[score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_df["score"] = not_video_df["score"] + not_video_predict_df["score"]
not_video_df = not_video_df.sort_values(by="score", ascending=False)
video_df = score_df.loc[score_df["cid"].isin(video_id)].set_index(["cid"])
video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(video_id)].set_index(["cid"])
video_df["score"] = video_df["score"] + video_predict_df["score"]
video_df = video_df.sort_values(by="score", ascending=False)
not_video_id = not_video_df.index.tolist()
video_id = video_df.index.tolist()
new_queue = not_video_id
i = 1
for j in video_id:
new_queue.insert(i, j)
i += 5
# print("分数合并成功")
return new_queue
# 如果取交集后没有视频日记
else:
score_df = score_df.set_index(["cid"])
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"]=score_df["score"]+predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
# print("分数合并成功1")
return score_df.index.tolist()
# 如果total_video_id是空列表
else:
score_df = score_df.set_index(["cid"])
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"] = score_df["score"] + predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
# print("分数合并成功1")
return score_df.index.tolist()
def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
id_str = str(diary_id[0])
for i in range(1, len(diary_id)):
id_str = id_str + "," + str(diary_id[i])
sql = "update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\
(queue_name,id_str,device_id, city_id)
cursor.execute(sql)
db.commit()
db.close()
print("成功写入diary_id")
def queue_compare(old_list, new_list):
global update_queue_numbers
print("更新日记队列总数:{}".format(update_queue_numbers))
# 去掉前面的"diary|"
old_list = list(map(lambda x: int(x[6:]),old_list))
# print("旧表前十个")
# print(old_list[:10])
# print("新表前十个")
# print(new_list[:10])
temp = list(range(len(old_list)))
x_dict = dict(zip(old_list, temp))
temp = list(range(len(new_list)))
y_dict = dict(zip(new_list, temp))
i = 0
for key in x_dict.keys():
if x_dict[key] != y_dict[key]:
i += 1
if i >0:
update_queue_numbers += 1
print("更新日记队列总数:{}".format(update_queue_numbers))
print("日记队列更新前日记总个数{},位置发生变化个数{},发生变化率{}%".format(len(old_list), i,
round(i / len(old_list) * 100), 2))
def get_queue(device_id, city_id,queue_name):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
sql = "select {} from device_diary_queue " \
"where device_id = '{}' and city_id = '{}';".format(queue_name,device_id, city_id)
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
if df.empty:
# print("该用户对应的日记为空")
return False
else:
queue_list = df.loc[0, 0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
db.close()
# print("成功获取queue")
return queue_list
def pipe_line(queue_name, queue_arg, device_id,total_video_id):
predict(queue_name, queue_arg, device_id)
predict_score_df = save_result(queue_name, queue_arg, device_id)
score_df = get_score(queue_arg)
if score_df.empty:
# print("获取的日记列表是空")
return False
else:
score_df = score_df.rename(columns={0: "score", 1: "cid"})
diary_queue = update_dairy_queue(score_df, predict_score_df,total_video_id)
return diary_queue
def user_update(device_id, city_id, queue_name,data_set_cid,total_video_id):
queue_list = get_queue(device_id, city_id, queue_name)
if queue_list:
queue_predict = list(set(queue_list) & set(data_set_cid))
queue_not_predict = list(set(queue_list) - set(data_set_cid))
queue_arg = [queue_predict, queue_not_predict, queue_list]
if queue_predict != []:
diary_queue = pipe_line(queue_name, queue_arg, device_id,total_video_id)
if diary_queue:
update_sql_dairy_queue(queue_name, diary_queue, device_id, city_id)
queue_compare(queue_list,diary_queue)
# print("更新结束")
else:
print("获取的日记列表是空,所以不更新日记队列")
else:
print("预测集是空,不需要预测")
else:
print("日记队列为空")
def multi_proecess_update(device_id, city_id, data_set_cid,total_video_id):
queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
pool = Pool(4)
for queue_name in queue_name_list:
pool.apply_async(user_update, (device_id, city_id, queue_name,data_set_cid,total_video_id,))
pool.close()
pool.join()
if __name__ == "__main__":
warnings.filterwarnings("ignore")
flag = False
# 下面这个ip是线上服务器ip
if socket.gethostbyname(socket.gethostname()) == '10.31.242.83':
flag = True
total_number = 0
# 增加缓存日记视频列表
cache_video_id = []
cache_device_city_list = []
update_queue_numbers = 0
while True:
if flag:
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
else:
data_set_cid = pd.read_csv(LOCAL_DIRCTORY + "data_set_cid.csv")["cid"].values.tolist()
total_video_id = get_video_id(cache_video_id)
cache_video_id = total_video_id
device_city_list = get_active_users(flag)
print("过滤前用户数:{}".format(len(device_city_list)))
# 过滤掉5分钟内预测过的用户
device_city_list = list(set(tuple(device_city_list))-set(tuple(cache_device_city_list)))
print("过滤后用户数:{}".format(len(device_city_list)))
print("缓存视频个数:{}".format(len(cache_device_city_list)))
if datetime.now().minute % 5 == 0:
cache_device_city_list = []
if device_city_list != []:
cache_device_city_list.extend(device_city_list)
total_number += len(device_city_list)
print("累计预测用户总数:{}".format(total_number))
for device_city in device_city_list:
# start = time.time()
multi_proecess_update(device_city[0], device_city[1], data_set_cid, total_video_id)
# end = time.time()
# print("更新该用户队列耗时{}秒".format((end - start)))
# # TODO 上线后把预测用户改成多进程预测
......@@ -68,8 +68,8 @@ def ffm_transform(data, test_number, validation_number):
print("Start ffm transform")
start = time.time()
ffm_train = multiFFMFormatPandas()
# 服务器内存空闲的时候,可以下面的6改成8。6比较稳定,如果服务器内存占用较多的时候,用8可能因为分配不到内存,脚本挂掉。
data = ffm_train.fit_transform(data, y='y',n=50000,processes=6)
# 服务器内存空闲的时候,可以下面的4改成6。4比较稳定,如果服务器内存被其他程序占用较多的时候,用6可能因为分配不到内存,脚本挂掉。
data = ffm_train.fit_transform(data, y='y',n=50000,processes=4)
with open(DIRECTORY_PATH+"train/ffm.pkl", "wb") as f:
pickle.dump(ffm_train, f)
......
......@@ -7,18 +7,19 @@ import time
# 获取当下一分钟内活跃用户
def get_active_users():
def get_active_users(flag):
now = datetime.now()
now_start = str(now)[:16] + ":00"
now_end = str(now)[:16] + ":59"
sql = "select device_id,city_id from user_active_time " \
"where active_time <= '{}' and active_time >= '{}'".format(now_end,now_start)
df = con_sql(sql)
if flag:
df = con_sql(sql)
else:
pass
# df = 问一下亚男,如果没有,造表,造数据
if df.empty:
print("当下这一分钟没有活跃用户,不需要预测")
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
time.sleep(56)
return []
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment