Commit 5610158f authored by 张彦钊's avatar 张彦钊

add multi_ffm function

parent 52081b94
from eda.ml_tools.rocCurve import get_roc_curve
import pandas as pd
from utils import *
from config import *
if __name__ == "__main__":
......
DIRECTORY_PATH = '/data2/models/'
# 测试日期一定要大于验证日期,因为切割数据集的代码是这样设置的
VALIDATION_DATE = '2018-08-05'
TEST_DATE = '2018-08-06'
DATA_START_DATE = '2018-07-05'
......
......@@ -12,8 +12,7 @@ def train():
param = {'task': 'binary', 'lr': lr, 'lambda': l2_lambda, 'metric': 'auc'}
ffm_model.fit(param, DIRECTORY_PATH + "model_{0}-{1}_lr{2}_lambda{3}.out".format(DATA_START_DATE,
DATA_END_DATE, lr, l2_lambda))
ffm_model.fit(param, DIRECTORY_PATH + "model_lr{}_lambda{}.out".format(lr, l2_lambda))
print("predicting")
ffm_model.setTest(DIRECTORY_PATH + "test{0}.csv".format(TEST_DATE))
......
import pandas as pd
from sklearn import metrics
from sklearn.metrics import auc
# import argparse
def get_roc_curve(y,pred,pos_label):
"""
计算二分类问题的roc和auc
"""
fpr, tpr, thresholds = metrics.roc_curve(y, pred, pos_label)
AUC = metrics.auc(fpr, tpr)
print(AUC)
from config import *
import pandas as pd
import pickle
......@@ -54,8 +52,7 @@ def predict(user_profile):
ffm_model = xl.create_ffm()
ffm_model.setTest(instance_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model_{0}-{1}_lr{2}_lambda{3}.out".format(DATA_START_DATE,
DATA_END_DATE, lr, l2_lambda),
ffm_model.predict(DIRECTORY_PATH + "model_lr{}_lambda{}.out".format(lr, l2_lambda),
DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
print("预测结束")
predict_save_to_local(user_profile, instance)
......
......@@ -2,7 +2,7 @@
import time
from prepareData import fetch_data
from utils import FFMFormatPandas
from utils import *
import pandas as pd
from config import *
import pickle
......@@ -55,12 +55,12 @@ def feature_en():
print(data.head(2))
# 持久化候选cid
data_set_cid = data["cid"].unique()
cid_df = pd.DataFrame()
cid_df['cid'] = data_set_cid
print("data_set_cid :")
print(cid_df.head(2))
cid_df.to_csv(DIRECTORY_PATH + "data_set_cid.csv", index=False)
# data_set_cid = data["cid"].unique()
# cid_df = pd.DataFrame()
# cid_df['cid'] = data_set_cid
# print("data_set_cid :")
# print(cid_df.head(2))
# cid_df.to_csv(DIRECTORY_PATH + "data_set_cid.csv", index=False)
# 将device_id 保存,目的是为了判断预测的device_id是否在这个集合里,如果不在,不需要预测
data_set_device_id = data["device_id"].unique()
......@@ -76,14 +76,14 @@ def ffm_transform(data, test_number, validation_number):
print("Start ffm transform")
start = time.time()
ffm_train = FFMFormatPandas()
data = ffm_train.fit_transform(data, y='y')
ffm_train = multiFFMFormatPandas()
data = ffm_train.fit_transform(data, y='y',n=50000,processes=6)
with open(DIRECTORY_PATH+"ffm_{0}_{1}.pkl".format(DATA_START_DATE,DATA_END_DATE), "wb") as f:
pickle.dump(ffm_train, f)
print("done transform ffm")
end = time.time()
print("ffm转化数据耗时:")
print("ffm转化数据耗时(秒):")
print(end - start)
data.to_csv(DIRECTORY_PATH + "data{0}-{1}.csv".format(DATA_START_DATE, DATA_END_DATE), index=False)
......
from processData import *
from diaryTraining import *
from diaryCandidateSet import get_eachCityDiaryTop3000
from datetime import datetime
from datetime import timedelta
# 把数据获取、特征转换、模型训练的模型串联在一起
......@@ -14,3 +14,6 @@ if __name__ == "__main__":
print('---------------prepare candidates--------------')
get_eachCityDiaryTop3000()
print("end")
# encoding = "utf-8"
import pymysql
import pandas as pd
import numpy as np
import redis
import pandas as pd
from sklearn import metrics
from sklearn.metrics import auc
from multiprocessing import Pool
def get_roc_curve(y, pred, pos_label):
"""
计算二分类问题的roc和auc
"""
fpr, tpr, thresholds = metrics.roc_curve(y, pred, pos_label)
AUC = metrics.auc(fpr, tpr)
print(AUC)
# 从Tidb数据库的表里获取数据,并转化成df格式
def con_sql(sql):
......@@ -18,13 +28,142 @@ def con_sql(sql):
db.close()
return df
# 把数据写到redis里
# TODO 生产环境的redis地址没有提供,下面的地址是测试环境的,需要改成生产环境地址
def add_data_to_redis(key,val):
r = redis.StrictRedis(host='10.30.50.58', port=6379, db = 12)
def add_data_to_redis(key, val):
r = redis.StrictRedis(host='10.30.50.58', port=6379, db=12)
r.set(key, val)
# 设置key的过期时间,36小时后过期
r.expire(key,36*60*60)
r.expire(key, 36 * 60 * 60)
# 多线程ffm转化类:
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
last_idx = 0
for col in df.columns:
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=10000,processes=5):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n=n,processes=processes)
def transform_row_(self, row, t):
ffm = []
if self.y is not None:
ffm.append(str(row.loc[row.index == self.y][0]))
if self.y is None:
ffm.append(str(0))
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
return ' '.join(ffm)
# def transform(self, df):
# t = df.dtypes.to_dict()
# return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
def transform(self, df,n=10000,processes=5):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes=processes)
for i in range(len(data_list)):
print("处理进度: "+str(i+1)+"/"+str(len(data_list)))
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
'''
# 使用生成器方法计算,配合data_split_line的一起使用
result_map = {}
for i in data_list:
s = pool.apply_async(self.pool_function, (i, t,))
result_map.update(s.get())
'''
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
s = {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.iloc[x:x + step])
x = x + step + 1
else:
data_list.append(data.iloc[x:data.__len__()])
break
'''
# 返回生成器方法,但是本地测试效率不高
x = 0
while True:
if x + step < data.__len__():
yield data.iloc[x:x + step]
x = x + step + 1
else:
yield data.iloc[x:data.__len__()]
break
'''
return data_list
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
# ffm 格式转换函数、类
class FFMFormatPandas:
......@@ -82,6 +221,7 @@ class FFMFormatPandas:
def transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment