Commit 640a3426 authored by 张彦钊's avatar 张彦钊

add print

parent 22196732
...@@ -301,6 +301,8 @@ if __name__ == "__main__": ...@@ -301,6 +301,8 @@ if __name__ == "__main__":
device_city_list = get_active_users(flag,path,differ) device_city_list = get_active_users(flag,path,differ)
# 过滤掉5分钟内预测过的用户 # 过滤掉5分钟内预测过的用户
device_city_list = list(set(tuple(device_city_list))-set(tuple(cache_device_city_list))) device_city_list = list(set(tuple(device_city_list))-set(tuple(cache_device_city_list)))
print("device_city_list")
print(device_city_list)
if datetime.now().minute % 5 == 0: if datetime.now().minute % 5 == 0:
cache_device_city_list = [] cache_device_city_list = []
if device_city_list != []: if device_city_list != []:
......
DIRECTORY_PATH = '/data/models/'
# 测试日期一定要大于验证日期,因为切割数据集的代码是这样设置的
# VALIDATION_DATE = '2018-08-05'
# TEST_DATE = '2018-08-06'
# DATA_START_DATE = '2018-07-05'
# DATA_END_DATE = '2018-08-06'
MODEL_VERSION = ''
lr = 0.03
l2_lambda = 0.002
#线上日记视频对应的ip
ONLINE_EAGLE_HOST = '10.66.157.22'
# 测试日记视频数据库连接参数
VIDEO_DB = {"host":"192.168.15.12","port":4000,"user":"root","db":'eagle'}
# 本地地址
LOCAL_DIRCTORY = "/Users/mac/utils/"
# 线上日记队列域名
QUEUE_ONLINE_HOST = 'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
# 本地日记队列域名
LOCAL_HOST = 'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
import time
from prepareData import fetch_data
from utils import *
import pandas as pd
from config import *
import pickle
def feature_en(data_start_date, data_end_date, validation_date, test_date):
exposure, click, click_device_id = fetch_data(data_start_date, data_end_date)
# 求曝光表和点击表的差集合
print("曝光表处理前的样本个数")
print(exposure.shape)
exposure = exposure.append(click)
exposure = exposure.append(click)
subset = click.columns.tolist()
exposure = exposure.drop_duplicates(subset=subset, keep=False)
print("差集后曝光表个数")
print(exposure.shape)
exposure = exposure.loc[exposure["device_id"].isin(click_device_id)]
print("去除未点击用户后曝光表个数")
print(exposure.shape)
# 打标签
click["y"] = 1
exposure["y"] = 0
print("正样本个数")
print(click.shape[0])
print("负样本个数")
print(exposure.shape[0])
# 合并点击表和曝光表
data = click.append(exposure)
print("点击表和曝光表合并成功")
data = data.sort_values(by="stat_date", ascending=False)
test_number = data[data["stat_date"] == test_date].shape[0]
validation_number = data[data["stat_date"] == validation_date].shape[0]
data = data.drop("stat_date", axis=1)
# 数值是0的特征会被ffm格式删除,经过下面的处理后,没有数值是0的特征
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 持久化候选cid,选预测候选集时用这个过滤
data_set_cid = data["cid"].unique()
cid_df = pd.DataFrame()
cid_df['cid'] = data_set_cid
cid_df.to_csv(DIRECTORY_PATH + "train/data_set_cid.csv", index=False)
print("成功保存data_set_cid")
# 将device_id 保存,目的是为了判断预测的device_id是否在这个集合里,如果不在,不需要预测
data_set_device_id = data["device_id"].unique()
device_id_df = pd.DataFrame()
device_id_df['device_id'] = data_set_device_id
device_id_df.to_csv(DIRECTORY_PATH + "train/data_set_device_id.csv", index=False)
print("成功保存data_set_device_id")
return data, test_number, validation_number
def ffm_transform(data, test_number, validation_number):
print("Start ffm transform")
start = time.time()
ffm_train = multiFFMFormatPandas()
data = ffm_train.fit_transform(data, y='y',n=50000,processes=20)
with open(DIRECTORY_PATH+"train/ffm.pkl", "wb") as f:
pickle.dump(ffm_train, f)
print("done transform ffm")
end = time.time()
print("ffm转化数据耗时(分):")
print((end - start)/60)
data.to_csv(DIRECTORY_PATH + "total_ffm_data.csv", index=False)
data = pd.read_csv(DIRECTORY_PATH + "total_ffm_data.csv", header=None)
print("数据集大小")
print(data.shape)
test = data.loc[:test_number]
print("测试集大小")
print(test.shape[0])
test.to_csv(DIRECTORY_PATH + "test_ffm_data.csv", index=False, header=None)
# 注意:测试集的日期一定要大于验证集,否则数据切割可能会出现错误
validation = data.loc[(test_number + 1):(test_number + validation_number)]
print("验证集大小")
print(validation.shape[0])
validation.to_csv(DIRECTORY_PATH + "validation_ffm_data.csv", index=False, header=None)
train = data.loc[(test_number + validation_number + 1):]
print("训练集大小")
print(train.shape[0])
# TODO validation date is not the end of train date
train.to_csv(DIRECTORY_PATH + "train_ffm_data.csv", index=False, header=None)
import os
import time
from config import *
# 定期删除特定文件夹内特征的文件
def remove_files(fileDir):
for eachFile in os.listdir(fileDir):
condition_a = os.path.isfile(fileDir + "/" + eachFile)
condition_b = ("DiaryTop3000.csv" in eachFile) or ("output.txt" in eachFile) or ("feed" in eachFile)
if condition_a and condition_b:
ft = os.stat(fileDir + "/" + eachFile)
ltime = int(ft.st_mtime)
# 删除5分钟前的文件
ntime = int(time.time()) - 5*60
if ltime <= ntime:
os.remove(fileDir + "/" + eachFile)
def delete_log():
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
if __name__ == "__main__":
while True:
delete_log()
remove_files(DIRECTORY_PATH + "result")
print("运行一次")
time.sleep(5*60)
import datetime
import pymysql
import pandas as pd
from sklearn.utils import shuffle
import numpy as np
import xlearn as xl
# 从数据库的表里获取数据,并转化成df格式
def con_sql(sql):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
db.close()
return df
# 获取点击表里的device_id
sql = "select distinct device_id from data_feed_click where cid_type = 'diary'"
click_device_id = con_sql(sql)[0].values.tolist()
print("成功获取点击表里的device_id")
# 获取点击表里的数据
sql = "select cid,device_id,time from data_feed_click where cid_type = 'diary'"
click = con_sql(sql)
click = click.rename(columns={0:"cid",1:"device_id",2:"time"})
print("成功获取点击表里的数据")
# 获取曝光表里的数据
sql = "select cid,device_id,time from data_feed_exposure where cid_type = 'diary'"
exposure = con_sql(sql)
exposure = exposure.rename(columns={0:"cid",1:"device_id",2:"time"})
print("成功获取曝光表里的数据")
# 求曝光表和点击表的差集合
exposure.append(click)
exposure.append(click)
subset = click.columns.tolist()
exposure = exposure.drop_duplicates(subset=subset,keep=False)
print("成功完成曝光表和点击表的差集合")
exposure = exposure.loc[exposure["device_id"].isin(click_device_id)]
# 打标签
click["y"] = 1
exposure["y"] = 0
print("成功获取正负样本")
# 合并点击表和曝光表
data = click.append(exposure)
print("done 合并点击表和曝光表")
print(data.head(2))
# 从time特征中抽取hour、weekday
data["hour"] = data["time"].apply(lambda x:datetime.datetime.fromtimestamp(x).hour)
data["weekday"] = data["time"].apply(lambda x:datetime.datetime.fromtimestamp(x).weekday())
# 数值是0的特征会被ffm格式删除,经过下面的处理后,没有数值是0的特征
data.loc[data["hour"]==0] = 24
data.loc[data["weekday"]==0] = 7
data["hour"] = data["hour"].astype("category")
data["weekday"] = data["weekday"].astype("category")
data = data.drop("time",axis=1)
print("成功从time特征中抽取hour、weekday")
print(data.head(2))
data = shuffle(data)
print("start ffm transform")
# ffm 格式转换函数、类
class FFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
last_idx = 0
for col in df.columns:
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None):
self.fit(df, y)
return self.transform(df)
def transform_row_(self, row, t):
ffm = []
if self.y != None:
ffm.append(str(row.loc[row.index == self.y][0]))
if self.y is None:
ffm.append(str(0))
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
return ' '.join(ffm)
def transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
ffm_train = FFMFormatPandas()
data = ffm_train.fit_transform(data, y='y')
print("done transform ffm")
n = np.rint(data.shape[0]/8)
m = np.rint(data.shape[0]*(3/8))
# 1/8的数据集用来做测试集
data.loc[:n].to_csv("/home/zhangyanzhao/test.csv",index = False,header = None)
# 1/4的数据集用来做验证集
data.loc[n+1:m].to_csv("/home/zhangyanzhao/validation.csv",index = False,header = None)
# 剩余的数据集用来做验证集
data.loc[m+1:].to_csv("/home/zhangyanzhao/train.csv",index = False,header = None)
# 销毁data,目的是为了节省内存
data = data.drop(data.index.tolist())
print("start training")
ffm_model = xl.create_ffm()
ffm_model.setTrain("/home/zhangyanzhao/train.csv")
ffm_model.setValidate("/home/zhangyanzhao/validation.csv")
param = {'task':'binary', 'lr':0.2,
'lambda':0.002, 'metric':'auc'}
ffm_model.fit(param, '/home/zhangyanzhao/model.out')
ffm_model.setTest("/home/zhangyanzhao/test.csv")
ffm_model.setSigmoid()
ffm_model.predict("/home/zhangyanzhao/model.out", "/home/zhangyanzhao/output.txt")
print("end")
import pymysql
import pandas as pd
from utils import *
from config import *
import numpy as np
import time
# 候选集cid只能从训练数据集cid中选择
def filter_cid(df):
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
if not df.empty:
df = df.loc[df["cid"].isin(data_set_cid)]
return df
def get_allCitiesDiaryTop3000():
# 获取全国点击量TOP3000日记
sql = "select city_id,cid from data_feed_click2 " \
"where cid_type = 'diary' group by cid order by max(click_count_choice) desc limit 3000"
allCitiesTop3000 = con_sql(sql)
allCitiesTop3000 = allCitiesTop3000.rename(columns={0: "city_id", 1: "cid"})
allCitiesTop3000 = filter_cid(allCitiesTop3000)
allCitiesTop3000.to_csv(DIRECTORY_PATH + "diaryTestSet/allCitiesDiaryTop3000.csv",index=False)
return allCitiesTop3000
def get_cityList():
# 获取全国城市列表
sql = "select distinct city_id from data_feed_click2"
cityList = con_sql(sql)
cityList.to_csv(DIRECTORY_PATH + "diaryTestSet/cityList.csv",index=False)
cityList = cityList[0].values.tolist()
return cityList
def get_eachCityDiaryTop3000():
# 获取每个城市点击量TOP3000日记,如果数量小于3000,用全国点击量TOP3000日记补充
cityList = get_cityList()
allCitiesTop3000 = get_allCitiesDiaryTop3000()
for i in cityList:
sql = "select city_id,cid from data_feed_click2 " \
"where cid_type = 'diary' and city_id = '{0}' group by cid " \
"order by max(click_count_choice) desc limit 3000".format(i)
data = con_sql(sql)
data = data.rename(columns={0: "city_id", 1: "cid"})
data = filter_cid(data)
if data.shape[0] < 3000:
n = 3000 - data.shape[0]
# 全国点击量TOP3000日记中去除该城市的日记
temp = allCitiesTop3000[allCitiesTop3000["city_id"] != i].loc[:n - 1]
data = data.append(temp)
else:
pass
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(i)
data.to_csv(file_name,index=False)
def pool_method(city,sql,allCitiesTop3000):
data = con_sql(sql)
data = data.rename(columns={0: "city_id", 1: "cid"})
data = filter_cid(data)
if data.shape[0] < 3000:
n = 3000 - data.shape[0]
# 全国点击量TOP3000日记中去除该城市的日记
temp = allCitiesTop3000[allCitiesTop3000["city_id"] != city].loc[:n - 1]
data = data.append(temp)
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(city)
data.to_csv(file_name, index=False)
# 多线程方法获取全国城市热门日记
def multi_get_eachCityDiaryTop3000(processes=8):
city_list = get_cityList()
allCitiesTop3000 = get_allCitiesDiaryTop3000()
pool = Pool(processes)
for city in city_list:
sql = "select city_id,cid from data_feed_click2 " \
"where cid_type = 'diary' and city_id = '{0}' group by cid " \
"order by max(click_count_choice) desc limit 3000".format(city)
pool.apply_async(pool_method,(city,sql,allCitiesTop3000,))
pool.close()
pool.join()
if __name__ == "__main__":
start = time.time()
multi_get_eachCityDiaryTop3000()
end = time.time()
print("获取各城市热门日记耗时{}分".format((end-start)/60))
#!/srv/envs/nvwa/bin/python
# -*- coding: utf-8 -*-
import pickle
import xlearn as xl
import pandas as pd
import pymysql
from datetime import datetime
# utils 包必须要导,否则ffm转化时用到的pickle找不到utils,会报错
import utils
import warnings
from multiprocessing import Pool
from userProfile import get_active_users
from sklearn.preprocessing import MinMaxScaler
import time
from config import *
def get_video_id(cache_video_id):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
cursor = db.cursor()
sql = "select diary_id from feed_diary_boost;"
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
print("videio_id 预览")
print(df.head(1))
db.close()
if df.empty:
return cache_video_id
else:
video_id = df[0].values.tolist()
return video_id
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def feature_en(x_list, device_id):
data = pd.DataFrame(x_list)
# 下面的列名一定要用cid,不能用diaryid,因为预测模型用到的ffm上是cid
data = data.rename(columns={0: "cid"})
data["device_id"] = device_id
now = datetime.now()
data["hour"] = now.hour
data["minute"] = now.minute
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
# print("done 特征工程")
return data
# 把ffm.pkl load进来,将上面的数据转化为ffm格式
def transform_ffm_format(df,queue_name,device_id):
with open(DIRECTORY_PATH + "ffm.pkl", "rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.native_transform(df)
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}.csv".format(device_id, queue_name)
data.to_csv(predict_file_name, index=False, header=None)
# print("done ffm")
return predict_file_name
def predict(queue_name,queue_arg,device_id):
data = feature_en(queue_arg[0], device_id)
data_file_path = transform_ffm_format(data,queue_name,device_id)
ffm_model = xl.create_ffm()
ffm_model.setTest(data_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name))
def save_result(queue_name,queue_arg,device_id):
score_df = pd.read_csv(DIRECTORY_PATH + "result/output{0}_{1}.csv".format(device_id, queue_name), header=None)
mm_scaler = MinMaxScaler()
mm_scaler.fit(score_df)
score_df = pd.DataFrame(mm_scaler.transform(score_df))
score_df = score_df.rename(columns={0: "score"})
score_df["cid"] = queue_arg[0]
# 去掉cid前面的"diary|"
score_df["cid"] = score_df["cid"].apply(lambda x:x[6:])
# print("score_df:")
# print(score_df.head(1))
# print(score_df.shape)
if queue_arg[1] != []:
df_temp = pd.DataFrame(queue_arg[1]).rename(columns={0: "cid"})
df_temp["score"] = 0
df_temp = df_temp.sort_index(axis=1,ascending=False)
df_temp["cid"] = df_temp["cid"].apply(lambda x: x[6:])
predict_score_df = score_df.append(df_temp)
return predict_score_df
else:
return score_df
def get_score(queue_arg):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle')
cursor = db.cursor()
# 去除diary_id 前面的"diary|"
diary_list = tuple(list(map(lambda x:x[6:],queue_arg[2])))
sql = "select score,diary_id from biz_feed_diary_score where diary_id in {};".format(diary_list)
cursor.execute(sql)
result = cursor.fetchall()
score_df = pd.DataFrame(list(result)).dropna()
db.close()
return score_df
def update_dairy_queue(score_df,predict_score_df,total_video_id):
diary_id = score_df["cid"].values.tolist()
if total_video_id != []:
video_id = list(set(diary_id)&set(total_video_id))
if len(video_id)>0:
not_video = list(set(diary_id) - set(video_id))
# 为了相加时cid能够匹配,先把cid变成索引
not_video_df = score_df.loc[score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_df["score"] = not_video_df["score"] + not_video_predict_df["score"]
not_video_df = not_video_df.sort_values(by="score", ascending=False)
video_df = score_df.loc[score_df["cid"].isin(video_id)].set_index(["cid"])
video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(video_id)].set_index(["cid"])
video_df["score"] = video_df["score"] + video_predict_df["score"]
video_df = video_df.sort_values(by="score", ascending=False)
not_video_id = not_video_df.index.tolist()
video_id = video_df.index.tolist()
new_queue = not_video_id
i = 1
for j in video_id:
new_queue.insert(i, j)
i += 5
# print("分数合并成功")
return new_queue
# 如果取交集后没有视频日记
else:
score_df = score_df.set_index(["cid"])
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"]=score_df["score"]+predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
# print("分数合并成功1")
return score_df.index.tolist()
# 如果total_video_id是空列表
else:
score_df = score_df.set_index(["cid"])
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"] = score_df["score"] + predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
# print("分数合并成功1")
return score_df.index.tolist()
def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
id_str = str(diary_id[0])
for i in range(1, len(diary_id)):
id_str = id_str + "," + str(diary_id[i])
sql = "update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\
(queue_name,id_str,device_id, city_id)
cursor.execute(sql)
db.commit()
db.close()
print("成功写入diary_id")
def queue_compare(old_list, new_list):
global update_queue_numbers
print("更新日记队列总数:{}".format(update_queue_numbers))
# 去掉前面的"diary|"
old_list = list(map(lambda x: int(x[6:]),old_list))
# print("旧表前十个")
# print(old_list[:10])
# print("新表前十个")
# print(new_list[:10])
temp = list(range(len(old_list)))
x_dict = dict(zip(old_list, temp))
temp = list(range(len(new_list)))
y_dict = dict(zip(new_list, temp))
i = 0
for key in x_dict.keys():
if x_dict[key] != y_dict[key]:
i += 1
if i >0:
update_queue_numbers += 1
print("更新日记队列总数:{}".format(update_queue_numbers))
print("日记队列更新前日记总个数{},位置发生变化个数{},发生变化率{}%".format(len(old_list), i,
round(i / len(old_list) * 100), 2))
def get_queue(device_id, city_id,queue_name):
db = pymysql.connect(host='rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com', port=3306, user='doris',
passwd='o5gbA27hXHHm', db='doris_prod')
cursor = db.cursor()
sql = "select {} from device_diary_queue " \
"where device_id = '{}' and city_id = '{}';".format(queue_name,device_id, city_id)
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
if df.empty:
# print("该用户对应的日记为空")
return False
else:
queue_list = df.loc[0, 0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
db.close()
# print("成功获取queue")
return queue_list
def pipe_line(queue_name, queue_arg, device_id,total_video_id):
predict(queue_name, queue_arg, device_id)
predict_score_df = save_result(queue_name, queue_arg, device_id)
score_df = get_score(queue_arg)
if score_df.empty:
# print("获取的日记列表是空")
return False
else:
score_df = score_df.rename(columns={0: "score", 1: "cid"})
diary_queue = update_dairy_queue(score_df, predict_score_df,total_video_id)
return diary_queue
def user_update(device_id, city_id, queue_name,data_set_cid,total_video_id):
queue_list = get_queue(device_id, city_id, queue_name)
if queue_list:
queue_predict = list(set(queue_list) & set(data_set_cid))
queue_not_predict = list(set(queue_list) - set(data_set_cid))
queue_arg = [queue_predict, queue_not_predict, queue_list]
if queue_predict != []:
diary_queue = pipe_line(queue_name, queue_arg, device_id,total_video_id)
if diary_queue:
update_sql_dairy_queue(queue_name, diary_queue, device_id, city_id)
queue_compare(queue_list,diary_queue)
# print("更新结束")
else:
print("获取的日记列表是空,所以不更新日记队列")
else:
print("预测集是空,不需要预测")
else:
print("日记队列为空")
def multi_proecess_update(device_id, city_id, data_set_cid,total_video_id):
queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
pool = Pool(4)
for queue_name in queue_name_list:
pool.apply_async(user_update, (device_id, city_id, queue_name,data_set_cid,total_video_id,))
pool.close()
pool.join()
if __name__ == "__main__":
warnings.filterwarnings("ignore")
total_number = 0
# 增加缓存日记视频列表
cache_video_id = []
cache_device_city_list = []
update_queue_numbers = 0
while True:
data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
total_video_id = get_video_id(cache_video_id)
cache_video_id = total_video_id
device_city_list = get_active_users()
print("过滤前用户数:{}".format(len(device_city_list)))
# 过滤掉5分钟内预测过的用户
device_city_list = list(set(tuple(device_city_list))-set(tuple(cache_device_city_list)))
print("过滤后用户数:{}".format(len(device_city_list)))
print("缓存视频个数:{}".format(len(cache_device_city_list)))
if datetime.now().minute % 5 == 0:
cache_device_city_list = []
if device_city_list != []:
cache_device_city_list.extend(device_city_list)
total_number += len(device_city_list)
print("累计预测用户总数:{}".format(total_number))
for device_city in device_city_list:
# start = time.time()
multi_proecess_update(device_city[0], device_city[1], data_set_cid, total_video_id)
# end = time.time()
# print("更新该用户队列耗时{}秒".format((end - start)))
# # TODO 上线后把预测用户改成多进程预测
import pymysql
import pandas as pd
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
# 从数据库获取数据,并将数据转化成DataFrame
def get_data(sql):
cursor = db.cursor()
cursor.execute(sql)
data = cursor.fetchall()
data = pd.DataFrame(list(data)).dropna()
return data
# 获取全国点击量TOP2000日记
sql = "select city_id,cid where cid_type = 'diary' order by click_count_choice desc limit 2000"
allCitiesTop2000 = get_data(sql)
allCitiesTop2000 = allCitiesTop2000.rename(columns={0:"city_id",1:"cid"})
allCitiesTop2000.to_csv("\home\zhangyanzhao\diaryTestSet\allCitiesTop2000.csv")
print("成功获取全国日记点击量TOP2000")
# 获取全国城市列表
sql = "select distinct city_id from data_feed_click"
cityList = get_data(sql)
cityList.to_csv("\home\zhangyanzhao\diaryTestSet\cityList.csv")
cityList = cityList[0].values.tolist()
print("成功获取城市列表")
# 获取每个城市点击量TOP2000日记,如果数量小于2000,用全国点击量TOP2000日记补充
for i in cityList:
sql = "select city_id,cid from data_feed_click " \
"where cid_type = 'diary' and city_id = {0} " \
"order by click_count_choice desc limit 2000".format(i)
data = get_data(sql)
data = data.rename(columns={0:"city_id",1:"cid"})
if data.shape[0]<2000:
n = 2000-data.shape[0]
# 全国点击量TOP2000日记中去除该城市的日记
temp = allCitiesTop2000[allCitiesTop2000["city_id"]!=i].loc[:n-1]
data = data.append(temp)
else:
pass
file_name = "\home\zhangyanzhao\diaryTestSet\{0}DiaryTop2000.csv".format(i)
data.to_csv(file_name)
print("end")
import xlearn as xl
from config import *
def train():
print("Start training")
ffm_model = xl.create_ffm()
ffm_model.setTrain(DIRECTORY_PATH + "train_ffm_data.csv")
ffm_model.setValidate(DIRECTORY_PATH + "validation_ffm_data.csv")
# log保存路径,如果不加这个参数,日志默认保存在/temp路径下,不符合规范
param = {'task': 'binary', 'lr': lr, 'lambda': l2_lambda, 'metric': 'auc',"log":DIRECTORY_PATH+"result"}
ffm_model.fit(param, DIRECTORY_PATH + "train/model.out")
print("predicting")
ffm_model.setTest(DIRECTORY_PATH + "test_ffm_data.csv")
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "train/model.out",DIRECTORY_PATH + "test_set_predict_output.txt")
#!/srv/envs/nvwa/bin/python
# -*- coding: utf-8 -*-
import pickle
import xlearn as xl
import pandas as pd
import pymysql
from datetime import datetime
# utils 包必须要导,否则ffm转化时用到的pickle找不到utils,会报错
import utils
import warnings
from multiprocessing import Pool
from userProfile import get_active_users
from sklearn.preprocessing import MinMaxScaler
import time
from config import *
import socket
def get_video_id(cache_video_id):
if flag:
db = pymysql.connect(host=ONLINE_EAGLE_HOST, port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
else:
# 本地数据库,没有密码,可能报错
db = pymysql.connect(host=LOCAL_EAGLE_HOST, port=4000, user='root', db='eagle')
cursor = db.cursor()
sql = "select diary_id from feed_diary_boost;"
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
if df.empty:
return cache_video_id
else:
video_id = df[0].values.tolist()
return video_id
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def feature_en(x_list, device_id):
data = pd.DataFrame(x_list)
# 下面的列名一定要用cid,不能用diaryid,因为预测模型用到的ffm上是cid
data = data.rename(columns={0: "cid"})
data["device_id"] = device_id
now = datetime.now()
data["hour"] = now.hour
data["minute"] = now.minute
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
print("done 特征工程")
return data
# 把ffm.pkl load进来,将上面的数据转化为ffm格式
def transform_ffm_format(df,queue_name,device_id):
with open(path + "ffm.pkl", "rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.native_transform(df)
predict_file_name = path + "result/{0}_{1}.csv".format(device_id, queue_name)
data.to_csv(predict_file_name, index=False, header=None)
print("done ffm")
return predict_file_name
def predict(queue_name,queue_arg,device_id):
data = feature_en(queue_arg[0], device_id)
data_file_path = transform_ffm_format(data,queue_name,device_id)
ffm_model = xl.create_ffm()
ffm_model.setTest(data_file_path)
ffm_model.setSigmoid()
ffm_model.predict(path + "model.out",
path + "result/output{0}_{1}.csv".format(device_id, queue_name))
def save_result(queue_name,queue_arg,device_id):
score_df = pd.read_csv(path + "result/output{0}_{1}.csv".format(device_id, queue_name), header=None)
mm_scaler = MinMaxScaler()
mm_scaler.fit(score_df)
score_df = pd.DataFrame(mm_scaler.transform(score_df))
score_df = score_df.rename(columns={0: "score"})
score_df["cid"] = queue_arg[0]
# 去掉cid前面的"diary|"
score_df["cid"] = score_df["cid"].apply(lambda x:x[6:])
# print("score_df:")
# print(score_df.head(1))
# print(score_df.shape)
if queue_arg[1] != []:
df_temp = pd.DataFrame(queue_arg[1]).rename(columns={0: "cid"})
df_temp["score"] = 0
df_temp = df_temp.sort_index(axis=1,ascending=False)
df_temp["cid"] = df_temp["cid"].apply(lambda x: x[6:])
predict_score_df = score_df.append(df_temp)
return predict_score_df
else:
return score_df
def get_score(queue_arg):
if flag:
db = pymysql.connect(host='10.66.157.22', port=4000, user='root',passwd='3SYz54LS9#^9sBvC', db='eagle')
else:
db = pymysql.connect(host=LOCAL_HOST, port=3306, user='work', passwd='workwork', db='zhengxing_tes')
cursor = db.cursor()
# 去除diary_id 前面的"diary|"
diary_list = tuple(list(map(lambda x:x[6:],queue_arg[2])))
sql = "select score,diary_id from biz_feed_diary_score where diary_id in {};".format(diary_list)
cursor.execute(sql)
result = cursor.fetchall()
score_df = pd.DataFrame(list(result)).dropna()
db.close()
print("get score")
return score_df
def update_dairy_queue(score_df,predict_score_df,total_video_id):
diary_id = score_df["cid"].values.tolist()
if total_video_id != []:
video_id = list(set(diary_id)&set(total_video_id))
if len(video_id)>0:
not_video = list(set(diary_id) - set(video_id))
# 为了相加时cid能够匹配,先把cid变成索引
not_video_df = score_df.loc[score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(not_video)].set_index(["cid"])
not_video_df["score"] = not_video_df["score"] + not_video_predict_df["score"]
not_video_df = not_video_df.sort_values(by="score", ascending=False)
video_df = score_df.loc[score_df["cid"].isin(video_id)].set_index(["cid"])
video_predict_df = predict_score_df.loc[predict_score_df["cid"].isin(video_id)].set_index(["cid"])
video_df["score"] = video_df["score"] + video_predict_df["score"]
video_df = video_df.sort_values(by="score", ascending=False)
not_video_id = not_video_df.index.tolist()
video_id = video_df.index.tolist()
new_queue = not_video_id
i = 1
for j in video_id:
new_queue.insert(i, j)
i += 5
print("分数合并成功")
return new_queue
# 如果取交集后没有视频日记
else:
score_df = score_df.set_index(["cid"])
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"]=score_df["score"]+predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
print("分数合并成功1")
return score_df.index.tolist()
# 如果total_video_id是空列表
else:
score_df = score_df.set_index(["cid"])
predict_score_df = predict_score_df.set_index(["cid"])
score_df["score"] = score_df["score"] + predict_score_df["score"]
score_df = score_df.sort_values(by="score", ascending=False)
# print("分数合并成功1")
return score_df.index.tolist()
def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
if flag:
db = pymysql.connect(host=QUEUE_ONLINE_HOST, port=3306, user='doris', passwd='o5gbA27hXHHm',
db='doris_prod')
else:
db = pymysql.connect(host=LOCAL_HOST, port=3306, user='work',passwd='workwork', db='doris_test')
cursor = db.cursor()
id_str = str(diary_id[0])
for i in range(1, len(diary_id)):
id_str = id_str + "," + str(diary_id[i])
sql = "update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'".format\
(queue_name,id_str,device_id, city_id)
cursor.execute(sql)
db.commit()
db.close()
print("成功写入diary_id")
def queue_compare(old_list, new_list):
# 去掉前面的"diary|"
old_list = list(map(lambda x: int(x[6:]),old_list))
# print("旧表前十个")
# print(old_list[:10])
# print("新表前十个")
# print(new_list[:10])
temp = list(range(len(old_list)))
x_dict = dict(zip(old_list, temp))
temp = list(range(len(new_list)))
y_dict = dict(zip(new_list, temp))
i = 0
for key in x_dict.keys():
if x_dict[key] != y_dict[key]:
i += 1
if i >0:
print("日记队列更新前日记总个数{},位置发生变化个数{},发生变化率{}%".format(len(old_list), i,
round(i / len(old_list) * 100), 2))
def get_queue(device_id, city_id,queue_name):
if flag:
db = pymysql.connect(host=QUEUE_ONLINE_HOST, port=3306, user='doris',passwd='o5gbA27hXHHm',
db='doris_prod')
else:
db = pymysql.connect(host=LOCAL_HOST, port=3306, user='work',
passwd='workwork', db='doris_test')
cursor = db.cursor()
sql = "select {} from device_diary_queue " \
"where device_id = '{}' and city_id = '{}';".format(queue_name,device_id, city_id)
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
if df.empty:
print("该用户对应的日记为空")
return False
else:
queue_list = df.loc[0, 0].split(",")
queue_list = list(map(lambda x: "diary|" + str(x), queue_list))
db.close()
print("成功获取queue")
return queue_list
def pipe_line(queue_name, queue_arg, device_id,total_video_id):
predict(queue_name, queue_arg, device_id)
predict_score_df = save_result(queue_name, queue_arg, device_id)
score_df = get_score(queue_arg)
if score_df.empty:
print("获取的日记列表是空")
return False
else:
score_df = score_df.rename(columns={0: "score", 1: "cid"})
diary_queue = update_dairy_queue(score_df, predict_score_df,total_video_id)
return diary_queue
def user_update(device_id, city_id, queue_name,data_set_cid,total_video_id):
queue_list = get_queue(device_id, city_id, queue_name)
if queue_list:
queue_predict = list(set(queue_list) & set(data_set_cid))
queue_not_predict = list(set(queue_list) - set(data_set_cid))
queue_arg = [queue_predict, queue_not_predict, queue_list]
if queue_predict != []:
diary_queue = pipe_line(queue_name, queue_arg, device_id,total_video_id)
if diary_queue:
update_sql_dairy_queue(queue_name, diary_queue, device_id, city_id)
queue_compare(queue_list,diary_queue)
# print("更新结束")
else:
print("获取的日记列表是空,所以不更新日记队列")
else:
print("预测集是空,不需要预测")
else:
print("日记队列为空")
def multi_proecess_update(device_id, city_id, data_set_cid,total_video_id):
queue_name_list = ["native_queue","nearby_queue","nation_queue","megacity_queue"]
pool = Pool(4)
for queue_name in queue_name_list:
pool.apply_async(user_update, (device_id, city_id, queue_name,data_set_cid,total_video_id,))
pool.close()
pool.join()
if __name__ == "__main__":
warnings.filterwarnings("ignore")
flag = True
path = DIRECTORY_PATH
# 下面这个ip是本地电脑ip
if socket.gethostbyname(socket.gethostname()) == '172.30.8.160':
flag = False
path = LOCAL_DIRCTORY
# 增加缓存日记视频列表
cache_video_id = []
cache_device_city_list = []
differ = 0
while True:
data_set_cid = pd.read_csv(path + "data_set_cid.csv")["cid"].values.tolist()
total_video_id = get_video_id(cache_video_id)
cache_video_id = total_video_id
start = time.time()
device_city_list = get_active_users(flag,path,differ)
# 过滤掉5分钟内预测过的用户
device_city_list = list(set(tuple(device_city_list))-set(tuple(cache_device_city_list)))
if datetime.now().minute % 5 == 0:
cache_device_city_list = []
if device_city_list != []:
cache_device_city_list.extend(device_city_list)
for device_city in device_city_list:
multi_proecess_update(device_city[0], device_city[1], data_set_cid, total_video_id)
differ = time.time()-start
# # TODO 上线后把预测用户改成多进程预测
import os
import time
def check():
out = os.popen("ps aux | grep diaryQueueUpdate.py").read()
flag = 1
for line in out.splitlines():
if 'python diaryQueueUpdate.py' in line:
flag = 2
return flag
if __name__ == "__main__":
#TODO 正式上线后,把下面的循环和time.sleep打开
# while True:
if check() == 1:
os.popen('python diaryQueueUpdate.py')
print("成功重启diaryQueueUpdate")
# time.sleep(300)
\ No newline at end of file
from config import *
import pandas as pd
import pickle
import xlearn as xl
from userProfile import *
import time
from utils import *
import os
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def feature_en(user_profile):
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(user_profile['city_id'])
data = pd.read_csv(file_name)
data["device_id"] = user_profile['device_id']
now = datetime.now()
data["hour"] = now.hour
data["minute"] = now.minute
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
data = data.drop("city_id", axis=1)
return data
# 把ffm.pkl load进来,将上面的表转化为ffm格式
def transform_ffm_format(df, device_id):
with open(DIRECTORY_PATH+"ffm.pkl","rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.transform(df)
now = datetime.now().strftime("%Y-%m-%d-%H-%M")
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}DiaryTop3000.csv".format(device_id, now)
data.to_csv(predict_file_name, index=False,header=None)
print("成功将ffm预测文件写到服务器")
return predict_file_name
# 将模型加载,预测,把预测日记的概率值按照降序排序,存到一个表里
def predict(user_profile):
instance = feature_en(user_profile)
instance_file_path = transform_ffm_format(instance, user_profile["device_id"])
ffm_model = xl.create_ffm()
ffm_model.setTest(instance_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
print("该用户预测结束")
predict_save_to_local(user_profile, instance)
# 将预测结果与device_id 进行拼接,并按照概率降序排序
def wrapper_result(user_profile, instance):
proba = pd.read_csv(DIRECTORY_PATH +
"result/{0}_output.txt".format(user_profile['device_id']), header=None)
proba = proba.rename(columns={0: "prob"})
proba["cid"] = instance['cid']
proba = proba.sort_values(by="prob", ascending=False)
proba = proba.head(50)
return proba
# 预测候选集保存到本地
def predict_save_to_local(user_profile, instance):
proba = wrapper_result(user_profile, instance)
proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False)
print("成功将预测候选集保存到本地")
def router(device_id):
user_profile, not_exist = fetch_user_profile(device_id)
if not_exist:
print('Sorry, we don\'t have you.')
else:
predict(user_profile)
# 多进程预测
def multi_predict(predict_list,processes=12):
pool = Pool(processes)
for device_id in predict_list:
start = time.time()
pool.apply_async(router, (device_id,))
end = time.time()
print("该用户{}预测耗时{}秒".format(device_id, (end - start)))
pool.close()
pool.join()
if __name__ == "__main__":
# TODO 如果耗时小于一分钟,下一次取到的device_id和上一次相同。还有一种情况,一个用户持续活跃,会被重复预测
while True:
empty,device_id_list = get_active_users()
if empty:
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
time.sleep(58)
else:
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
predict_list = list(set(device_id_list) & set(old_device_id_list))
multi_predict(predict_list)
#TODO 上线前把预测流程中的计时器、打印代码删掉或者注释,因为预测对性能要求高,能少一条代码语句就少一条
from utils import con_sql
import datetime
import time
def fetch_data(start_date, end_date):
# 获取点击表里的device_id
sql = "select distinct device_id from data_feed_click2"
click_device_id = con_sql(sql)[0].values.tolist()
print("成功获取点击表里的device_id")
# 获取点击表里的数据
sql = "select cid,device_id,time,stat_date from data_feed_click2 " \
"where stat_date >= '{0}' and stat_date <= '{1}'".format(start_date, end_date)
click = con_sql(sql)
click = click.rename(columns={0: "cid", 1: "device_id", 2: "time_date", 3: "stat_date"})
print("成功获取点击表里的数据")
# 从time特征中抽取hour
click["hour"] = click["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).hour)
click["minute"] = click["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).minute)
click = click.drop("time_date", axis=1)
# 获取曝光表里的数据
sql = "select cid,device_id,time,stat_date from data_feed_exposure2 " \
"where stat_date >= '{0}' and stat_date <= '{1}'".format(start_date, end_date)
start = time.time()
exposure = con_sql(sql)
end = time.time()
print("获取曝光表耗时{}分".format((end-start)/60))
exposure = exposure.rename(columns={0: "cid", 1: "device_id", 2: "time_date", 3: "stat_date"})
print("成功获取曝光表里的数据")
# 从time特征中抽取hour
exposure["hour"] = exposure["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).hour)
exposure["minute"] = exposure["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).minute)
exposure = exposure.drop("time_date", axis=1)
return exposure, click, click_device_id
from dataProcess import *
from diaryTraining import *
import time
from utils import *
# 把数据获取、特征转换、模型训练的模型串联在一起
if __name__ == "__main__":
# while True:
# now = datetime.now()
# if (now.hour == 23) and (now.minute == 30):
start_train = time.time()
data_start_date, data_end_date, validation_date, test_date = get_date()
data, test_number, validation_number = feature_en(data_start_date, data_end_date,
validation_date,test_date)
ffm_transform(data, test_number, validation_number)
train()
end_train = time.time()
print("训练模型耗时{}分".format((end_train-start_train)/60))
move_file()
#TODO 如果用自己写的keepProcess文件守护进程,下面在这个函数里删掉重新启动进程那行代码,因为可能会多启动一次进程
# restart_process()
from utils import con_sql
from datetime import datetime
from config import *
import pandas as pd
import os
import time
import pymysql
import time
# 获取当下一分钟内活跃用户
def get_active_users(flag,path,differ):
if differ == 0:
end = time.time()
start = time.time()-60
elif 0 < differ < 10:
time.sleep(30)
differ += 30
end = time.time()
start = end - differ
else:
end = time.time()
start = end - differ
end_datetime = str(datetime.fromtimestamp(end))
start_datetime = str(datetime.fromtimestamp(start))
sql = "select device_id,city_id from user_active_time " \
"where active_time <= '{}' and active_time >= '{}'".format(end_datetime,start_datetime)
if flag:
df = con_sql(sql)
else:
db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='jerry_test')
sql = "select device_id,city_id from user_active_time"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
db.close()
if df.empty:
print("当下没有活跃用户数")
return []
# 统计活跃用户中尾号是6的用户数
else:
temp_list = df[0].values.tolist()
now = datetime.now()
tail6_file_path = path + "{}tail6Unique.csv".format(str(now)[:10])
if os.path.exists(tail6_file_path):
# 尾号是6的活跃用户数
tail_6_list = eval(pd.read_csv(tail6_file_path).loc[0, "list"])
else:
tail_6_list = []
tail_6_list.extend(list(filter(lambda x: (str(x)[-1] == "6"), temp_list)))
if tail_6_list != []:
df_tail_6 = pd.DataFrame({"number": [len(set(tail_6_list))], "time": [str(now)[:16]],
"list": [list(set(tail_6_list))]})
df_tail_6.to_csv(tail6_file_path, index=None)
print("截止现在尾号是6的独立活跃数:{}".format(len(set(tail_6_list))))
old_device_id_list = pd.read_csv(path + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
df = df.loc[df[0].isin(old_device_id_list)]
if df.empty:
print("该列表是新用户,不需要预测")
return []
else:
# TODO 正式上线后注释下面的只预测尾号是6的代码
# 只预测尾号是6的ID,这块是测试要求的
device_temp_list = df[0].values.tolist()
predict_list = list(filter(lambda x: (str(x)[-1] == "6") or (str(x)=="358035085192742")
or str(x)=="AB20292B-5D15-4C44-9429-1C2FF5ED26F6",
device_temp_list))
if predict_list == []:
print('没有尾号是6和目标用户')
return []
else:
df = df.loc[df[0].isin(predict_list)]
device_list = df[0].values.tolist()
city_list = df[1].values.tolist()
device_city_list = list(zip(device_list, city_list))
print("当下这一分钟预测用户数量:{}".format(len(device_city_list)))
#统计尾号6的预测用户
predict_file_path = path + "{}predictTail6Unique.csv".format(str(now)[:10])
if os.path.exists(predict_file_path):
# 预测过尾号是6的用户数
all_predict_list = eval(pd.read_csv(predict_file_path).loc[0, "list"])
else:
all_predict_list = []
all_predict_list.extend(device_list)
if all_predict_list != []:
df_predict = pd.DataFrame({"number": [len(set(all_predict_list))], "time": [str(now)[:16]],
"list": [list(set(all_predict_list))]})
df_predict.to_csv(predict_file_path, index=None)
return device_city_list
def fetch_user_profile(device_id):
sql = "select device_id,city_id from data_feed_click where device_id = '{0}' limit 1".format(device_id)
user_profile = con_sql(sql)
if user_profile.empty:
print("没有获取到该用户对应的city_id")
return None,True
else:
user_profile = user_profile.rename(columns={0:"device_id",1:"city_id"})
user_profile_dict = {}
for i in user_profile.columns:
user_profile_dict[i] = user_profile.loc[0, i]
return user_profile_dict, False
# encoding = "utf-8"
from datetime import datetime
from datetime import timedelta
import pymysql
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.metrics import auc
from multiprocessing import Pool
import os
import signal
from config import *
def get_date():
now = datetime.now()
year = now.year
month = now.month
day = now.day
date = datetime(year,month,day)
data_start_date = "2018-07-15"
data_end_date = "2018-08-30"
validation_date = "2018-08-29"
# data_start_date = (date - timedelta(days=3)).strftime("%Y-%m-%d")
# data_end_date = (date - timedelta(days=1)).strftime("%Y-%m-%d")
# validation_date = (date - timedelta(days=2)).strftime("%Y-%m-%d")
# 验证集和测试集的日期必须相差一天,否则切割数据集时会报错
test_date = data_end_date
print("data_start_date,data_end_date,validation_date,test_date:")
print(data_start_date,data_end_date,validation_date,test_date)
return data_start_date,data_end_date,validation_date,test_date
def get_roc_curve(y, pred, pos_label):
"""
计算二分类问题的roc和auc
"""
fpr, tpr, thresholds = metrics.roc_curve(y, pred, pos_label)
AUC = metrics.auc(fpr, tpr)
print(AUC)
# 从Tidb数据库的表里获取数据,并转化成df格式
def con_sql(sql):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
db.close()
return df
def move_file():
import os
for eachFile in os.listdir(DIRECTORY_PATH+"train"):
os.rename(DIRECTORY_PATH+"train" + "/" + eachFile,DIRECTORY_PATH + eachFile)
print("成功将文件剪切到对应路径")
def restart_process():
out = os.popen("ps aux | grep diaryUpdateOnlineOffline.py").read()
for line in out.splitlines():
if 'python diaryUpdateOnlineOffline.py' in line:
pid = int(line.split()[1])
# 有些进程的生命周期非常短或者随时可能结束,一定要捕捉这个异常
try:
os.kill(pid, signal.SIGKILL)
print("已杀死python diaryUpdateOnlineOffline.py 进程")
except OSError:
print('没有如此进程!!!')
os.popen('python diaryUpdateOnlineOffline.py')
print("成功重启diaryUpdateOnlineOffline.py")
else:
os.popen('python diaryUpdateOnlineOffline.py')
print("成功重启diaryUpdateOnlineOffline.py")
# 多线程ffm转化类:
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
last_idx = 0
for col in df.columns:
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=50000,processes=4):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
if self.y is not None:
ffm.append(str(row.loc[row.index == self.y][0]))
if self.y is None:
ffm.append(str(0))
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
return ' '.join(ffm)
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes)
print("总进度: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.iloc[x:x + step])
x = x + step + 1
else:
data_list.append(data.iloc[x:data.__len__()])
break
'''
# 返回生成器方法,但是本地测试效率不高
x = 0
while True:
if x + step < data.__len__():
yield data.iloc[x:x + step]
x = x + step + 1
else:
yield data.iloc[x:data.__len__()]
break
'''
return data_list
# 原生转化方法,不需要多进程
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
# ffm 格式转换函数、类
# class FFMFormatPandas:
# def __init__(self):
# self.field_index_ = None
# self.feature_index_ = None
# self.y = None
#
# def fit(self, df, y=None):
# self.y = y
# df_ffm = df[df.columns.difference([self.y])]
# if self.field_index_ is None:
# self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
#
# if self.feature_index_ is not None:
# last_idx = max(list(self.feature_index_.values()))
#
# if self.feature_index_ is None:
# self.feature_index_ = dict()
# last_idx = 0
#
# for col in df.columns:
# vals = df[col].unique()
# for val in vals:
# if pd.isnull(val):
# continue
# name = '{}_{}'.format(col, val)
# if name not in self.feature_index_:
# self.feature_index_[name] = last_idx
# last_idx += 1
# self.feature_index_[col] = last_idx
# last_idx += 1
# return self
#
# def fit_transform(self, df, y=None):
# self.fit(df, y)
# return self.transform(df)
#
# def transform_row_(self, row, t):
# ffm = []
# if self.y is not None:
# ffm.append(str(row.loc[row.index == self.y][0]))
# if self.y is None:
# ffm.append(str(0))
#
# for col, val in row.loc[row.index != self.y].to_dict().items():
# col_type = t[col]
# name = '{}_{}'.format(col, val)
# if col_type.kind == 'O':
# ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
# elif col_type.kind == 'i':
# ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
# return ' '.join(ffm)
#
# def transform(self, df):
# t = df.dtypes.to_dict()
# return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
#
# # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
# def is_feature_index_exist(self, name):
# if name in self.feature_index_:
# return True
# else:
# return False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment