Commit 0496a874 authored by 高雅喆's avatar 高雅喆

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

add gray stat pyScript
parents 6b26a267 4b43b998
......@@ -16,6 +16,7 @@ from config import *
import socket
def get_video_id(cache_video_id):
if flag:
db = pymysql.connect(host=ONLINE_EAGLE_HOST, port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
......@@ -64,7 +65,7 @@ def transform_ffm_format(df,queue_name,device_id):
data = ffm_format_pandas.native_transform(df)
predict_file_name = path + "result/{0}_{1}.csv".format(device_id, queue_name)
data.to_csv(predict_file_name, index=False, header=None)
# print("done ffm")
print("done ffm")
return predict_file_name
......@@ -118,6 +119,7 @@ def get_score(queue_arg):
result = cursor.fetchall()
score_df = pd.DataFrame(list(result)).dropna()
db.close()
print("get score")
return score_df
......@@ -146,7 +148,7 @@ def update_dairy_queue(score_df,predict_score_df,total_video_id):
new_queue.insert(i, j)
i += 5
# print("分数合并成功")
print("分数合并成功")
return new_queue
# 如果取交集后没有视频日记
else:
......@@ -154,7 +156,7 @@ def update_dairy_queue(score_df,predict_score_df,total_video_id):
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"]=score_df["score"]+predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
# print("分数合并成功1")
print("分数合并成功1")
return score_df.index.tolist()
# 如果total_video_id是空列表
else:
......@@ -187,8 +189,6 @@ def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
def queue_compare(old_list, new_list):
global update_queue_numbers
print("更新日记队列总数:{}".format(update_queue_numbers))
# 去掉前面的"diary|"
old_list = list(map(lambda x: int(x[6:]),old_list))
# print("旧表前十个")
......@@ -205,8 +205,6 @@ def queue_compare(old_list, new_list):
i += 1
if i >0:
update_queue_numbers += 1
print("更新日记队列总数:{}".format(update_queue_numbers))
print("日记队列更新前日记总个数{},位置发生变化个数{},发生变化率{}%".format(len(old_list), i,
round(i / len(old_list) * 100), 2))
......@@ -227,13 +225,13 @@ def get_queue(device_id, city_id,queue_name):
df = pd.DataFrame(list(result))
if df.empty:
# print("该用户对应的日记为空")
print("该用户对应的日记为空")
return False
else:
queue_list = df.loc[0, 0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
db.close()
# print("成功获取queue")
print("成功获取queue")
return queue_list
......@@ -242,7 +240,7 @@ def pipe_line(queue_name, queue_arg, device_id,total_video_id):
predict_score_df = save_result(queue_name, queue_arg, device_id)
score_df = get_score(queue_arg)
if score_df.empty:
# print("获取的日记列表是空")
print("获取的日记列表是空")
return False
else:
score_df = score_df.rename(columns={0: "score", 1: "cid"})
......@@ -282,33 +280,26 @@ def multi_proecess_update(device_id, city_id, data_set_cid,total_video_id):
if __name__ == "__main__":
warnings.filterwarnings("ignore")
flag = False
path = LOCAL_DIRCTORY
# 下面这个ip是线上服务器ip
if socket.gethostbyname(socket.gethostname()) == '10.31.242.83':
flag = True
path = DIRECTORY_PATH
total_number = 0
flag = True
path = DIRECTORY_PATH
# 下面这个ip是本地电脑ip
if socket.gethostbyname(socket.gethostname()) == '172.30.5.84':
flag = False
path = LOCAL_DIRCTORY
# 增加缓存日记视频列表
cache_video_id = []
cache_device_city_list = []
update_queue_numbers = 0
while True:
data_set_cid = pd.read_csv(path + "data_set_cid.csv")["cid"].values.tolist()
total_video_id = get_video_id(cache_video_id)
cache_video_id = total_video_id
device_city_list = get_active_users(flag)
print("过滤前用户数:{}".format(len(device_city_list)))
device_city_list = get_active_users(flag,path)
# 过滤掉5分钟内预测过的用户
device_city_list = list(set(tuple(device_city_list))-set(tuple(cache_device_city_list)))
print("过滤后用户数:{}".format(len(device_city_list)))
print("缓存视频个数:{}".format(len(cache_device_city_list)))
if datetime.now().minute % 5 == 0:
cache_device_city_list = []
if device_city_list != []:
cache_device_city_list.extend(device_city_list)
total_number += len(device_city_list)
print("累计预测用户总数:{}".format(total_number))
for device_city in device_city_list:
# start = time.time()
multi_proecess_update(device_city[0], device_city[1], data_set_cid, total_video_id)
......
......@@ -27,5 +27,36 @@ def queue():
db.close()
return all
def get_local_device():
db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='jerry_test')
cursor = db.cursor()
sql = "select device_id,city_id from user_active_time"
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
df = df.rename(columns={"0": "device_id", "1": "city_id"})
db.close()
df.to_csv('/Users/mac/utils/test_device_city_id.csv', index=None)
print(1)
if __name__=="__main__":
get_video_id()
LOCAL_HOST = 'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
db = pymysql.connect(host=LOCAL_HOST, port=3306, user='work', passwd='workwork', db='doris_test')
diary_id = [14207355,16197023,13006945,12363565,15296547,15082216,16198052,15228350,13006942,14229868,15303563,16211116,15225921,15250715,15271108,15479315,16197047,15544727,15336944,15486003,15517196,16211130,15547275,15572010]
device_id = '99000645287876'
city_id = 'beijing'
cursor = db.cursor()
id_str = str(diary_id[0])
for i in range(1, len(diary_id)):
id_str = id_str + "," + str(diary_id[i])
sql = "insert into device_diary_queue values ('{}','{}','{}','{}','{}','{}',89)".format \
(device_id, city_id,id_str,id_str,id_str,id_str)
cursor.execute(sql)
db.commit()
db.close()
print("成功写入diary_id")
from processData import *
# from processData import *
from diaryTraining import *
import time
from prepareData import fetch_data
from utils import *
import pandas as pd
from config import *
import pickle
def feature_en(data_start_date, data_end_date, validation_date, test_date):
exposure, click, click_device_id = fetch_data(data_start_date, data_end_date)
# 求曝光表和点击表的差集合
print("曝光表处理前的样本个数")
print(exposure.shape)
exposure = exposure.append(click)
exposure = exposure.append(click)
subset = click.columns.tolist()
exposure = exposure.drop_duplicates(subset=subset, keep=False)
print("差集后曝光表个数")
print(exposure.shape)
exposure = exposure.loc[exposure["device_id"].isin(click_device_id)]
print("去除未点击用户后曝光表个数")
print(exposure.shape)
# 打标签
click["y"] = 1
exposure["y"] = 0
print("正样本个数")
print(click.shape[0])
print("负样本个数")
print(exposure.shape[0])
# 合并点击表和曝光表
data = click.append(exposure)
print("点击表和曝光表合并成功")
data = data.sort_values(by="stat_date", ascending=False)
test_number = data[data["stat_date"] == test_date].shape[0]
validation_number = data[data["stat_date"] == validation_date].shape[0]
data = data.drop("stat_date", axis=1)
# 数值是0的特征会被ffm格式删除,经过下面的处理后,没有数值是0的特征
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 持久化候选cid,选预测候选集时用这个过滤
data_set_cid = data["cid"].unique()
cid_df = pd.DataFrame()
cid_df['cid'] = data_set_cid
cid_df.to_csv(DIRECTORY_PATH + "train/data_set_cid.csv", index=False)
print("成功保存data_set_cid")
# 将device_id 保存,目的是为了判断预测的device_id是否在这个集合里,如果不在,不需要预测
data_set_device_id = data["device_id"].unique()
device_id_df = pd.DataFrame()
device_id_df['device_id'] = data_set_device_id
device_id_df.to_csv(DIRECTORY_PATH + "train/data_set_device_id.csv", index=False)
print("成功保存data_set_device_id")
return data, test_number, validation_number
def ffm_transform(data, test_number, validation_number):
print("Start ffm transform")
start = time.time()
ffm_train = multiFFMFormatPandas()
# 服务器内存空闲的时候,可以下面的4改成6。4比较稳定,如果服务器内存被其他程序占用较多的时候,用6可能因为分配不到内存,脚本挂掉。
data = ffm_train.fit_transform(data, y='y',n=50000,processes=4)
with open(DIRECTORY_PATH+"train/ffm.pkl", "wb") as f:
pickle.dump(ffm_train, f)
print("done transform ffm")
end = time.time()
print("ffm转化数据耗时(分):")
print((end - start)/60)
data.to_csv(DIRECTORY_PATH + "total_ffm_data.csv", index=False)
data = pd.read_csv(DIRECTORY_PATH + "total_ffm_data.csv", header=None)
print("数据集大小")
print(data.shape)
test = data.loc[:test_number]
print("测试集大小")
print(test.shape[0])
test.to_csv(DIRECTORY_PATH + "test_ffm_data.csv", index=False, header=None)
# 注意:测试集的日期一定要大于验证集,否则数据切割可能会出现错误
validation = data.loc[(test_number + 1):(test_number + validation_number)]
print("验证集大小")
print(validation.shape[0])
validation.to_csv(DIRECTORY_PATH + "validation_ffm_data.csv", index=False, header=None)
train = data.loc[(test_number + validation_number + 1):]
print("训练集大小")
print(train.shape[0])
# TODO validation date is not the end of train date
train.to_csv(DIRECTORY_PATH + "train_ffm_data.csv", index=False, header=None)
# 把数据获取、特征转换、模型训练的模型串联在一起
if __name__ == "__main__":
......
......@@ -4,10 +4,11 @@ from config import *
import pandas as pd
import os
import time
import pymysql
# 获取当下一分钟内活跃用户
def get_active_users(flag):
def get_active_users(flag,path):
now = datetime.now()
now_start = str(now)[:16] + ":00"
now_end = str(now)[:16] + ":59"
......@@ -16,17 +17,37 @@ def get_active_users(flag):
if flag:
df = con_sql(sql)
else:
pass
# df = 问一下亚男,如果没有,造表,造数据
db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='jerry_test')
sql = "select device_id,city_id from user_active_time"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
db.close()
if df.empty:
print("当下这一分钟没有活跃用户,不需要预测")
time.sleep(56)
return []
else:
df = df.rename(columns={0: "device_id", 1: "city_id"})
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
# 统计活跃用户中尾号是6的用户数
temp_list = df[0].values.tolist()
tail6_file_path = path + "{}tail6Unique.csv".format(str(now)[:10])
if os.path.exists(tail6_file_path):
# 尾号是6的活跃用户数
tail_6_list = eval(pd.read_csv(tail6_file_path).loc[0, "list"])
else:
tail_6_list = []
tail_6_list.extend(list(filter(lambda x: (str(x)[-1] == "6"), temp_list)))
if tail_6_list != []:
df_tail_6 = pd.DataFrame({"number": [len(set(tail_6_list))], "time": [str(now)[:16]],
"list": [list(set(tail_6_list))]})
df_tail_6.to_csv(tail6_file_path, index=None)
print("截止现在尾号是6的独立活跃数:{}".format(len(set(tail_6_list))))
old_device_id_list = pd.read_csv(path + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
df = df.loc[df["device_id"].isin(old_device_id_list)]
df = df.loc[df[0].isin(old_device_id_list)]
if df.empty:
print("该列表是新用户,不需要预测")
time.sleep(56)
......@@ -34,7 +55,7 @@ def get_active_users(flag):
else:
# TODO 正式上线后注释下面的只预测尾号是6的代码
# 只预测尾号是6的ID,这块是测试要求的
device_temp_list = df["device_id"].values.tolist()
device_temp_list = df[0].values.tolist()
predict_list = list(filter(lambda x: (str(x)[-1] == "6") or (str(x)=="358035085192742")
or str(x)=="AB20292B-5D15-4C44-9429-1C2FF5ED26F6",
......@@ -44,11 +65,25 @@ def get_active_users(flag):
time.sleep(56)
return []
else:
df = df.loc[df["device_id"].isin(predict_list)]
device_list = df["device_id"].values.tolist()
city_list = df["city_id"].values.tolist()
df = df.loc[df[0].isin(predict_list)]
device_list = df[0].values.tolist()
city_list = df[1].values.tolist()
device_city_list = list(zip(device_list, city_list))
print("当下这一分钟预测用户数量:{}".format(len(device_city_list)))
#统计尾号6的预测用户
predict_file_path = path + "{}predictTail6Unique.csv".format(str(now)[:10])
if os.path.exists(predict_file_path):
# 预测过尾号是6的用户数
all_predict_list = eval(pd.read_csv(predict_file_path).loc[0, "list"])
else:
all_predict_list = []
all_predict_list.extend(device_city_list)
if all_predict_list != []:
df_predict = pd.DataFrame({"number": [len(set(all_predict_list))], "time": [str(now)[:16]],
"list": [list(set(all_predict_list))]})
df_predict.to_csv(predict_file_path, index=None)
return device_city_list
......
......@@ -80,21 +80,21 @@ def move_file():
def restart_process():
out = os.popen("ps aux | grep diaryQueueUpdate.py").read()
out = os.popen("ps aux | grep diaryUpdateOnlineOffline.py").read()
for line in out.splitlines():
if 'python diaryQueueUpdate.py' in line:
if 'python diaryUpdateOnlineOffline.py' in line:
pid = int(line.split()[1])
# 有些进程的生命周期非常短或者随时可能结束,一定要捕捉这个异常
try:
os.kill(pid, signal.SIGKILL)
print("已杀死python diaryQueueUpdate.py 进程")
print("已杀死python diaryUpdateOnlineOffline.py 进程")
except OSError:
print('没有如此进程!!!')
os.popen('python diaryQueueUpdate.py')
print("成功重启diaryQueueUpdate")
os.popen('python diaryUpdateOnlineOffline.py')
print("成功重启diaryUpdateOnlineOffline.py")
else:
os.popen('python diaryQueueUpdate.py')
print("成功重启diaryQueueUpdate")
os.popen('python diaryUpdateOnlineOffline.py')
print("成功重启diaryUpdateOnlineOffline.py")
# 把数据写到redis里
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment