Commit ad29164a authored by 高雅喆's avatar 高雅喆

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

update Top features format %
parents da510ba8 df7a805c
......@@ -3,6 +3,7 @@ import pandas as pd
from utils import *
from config import *
import numpy as np
import time
# 候选集cid只能从训练数据集cid中选择
......@@ -36,53 +37,6 @@ def get_cityList():
return cityList
def pool_method(i,sql,allCitiesTop3000):
data = con_sql(sql)
data = data.rename(columns={0: "city_id", 1: "cid"})
data = filter_cid(data)
if data.shape[0] < 3000:
n = 3000 - data.shape[0]
# 全国点击量TOP3000日记中去除该城市的日记
temp = allCitiesTop3000[allCitiesTop3000["city_id"] != i].loc[:n - 1]
data = data.append(temp)
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(i)
data.to_csv(file_name, index=False)
print("成功保存{}地区DiaryTop3000".format(i))
# 把城市列表切分成n份,然后拼接成一个列表
# def split_cityList(cityList,n):
# l = len(cityList)
# step = np.rint(l/n)
# new_list = []
# x = 0
# while True:
# if x + step < :
# data_list.append(data.iloc[x:x + step])
# x = x + step + 1
# else:
# data_list.append(data.iloc[x:data.__len__()])
# break
# 多线程方法获取全国城市热门日记
# def multi_get_eachCityDiaryTop3000(processes):
# cityList = get_cityList()
# allCitiesTop3000 = get_allCitiesDiaryTop3000()
#
# pool = Pool(processes)
# for i in range(len(data_list)):
# data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
#
# result_map = {}
# for i in data_list:
# result_map.update(i.get())
# pool.close()
# pool.join()
def get_eachCityDiaryTop3000():
# 获取每个城市点击量TOP3000日记,如果数量小于3000,用全国点击量TOP3000日记补充
cityList = get_cityList()
......@@ -106,6 +60,40 @@ def get_eachCityDiaryTop3000():
data.to_csv(file_name,index=False)
print("成功保存{}地区DiaryTop3000".format(i))
def pool_method(city,sql,allCitiesTop3000):
data = con_sql(sql)
data = data.rename(columns={0: "city_id", 1: "cid"})
data = filter_cid(data)
if data.shape[0] < 3000:
n = 3000 - data.shape[0]
# 全国点击量TOP3000日记中去除该城市的日记
temp = allCitiesTop3000[allCitiesTop3000["city_id"] != city].loc[:n - 1]
data = data.append(temp)
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(city)
data.to_csv(file_name, index=False)
print("成功保存{}地区DiaryTop3000".format(city))
# 多线程方法获取全国城市热门日记
def multi_get_eachCityDiaryTop3000(processes=8):
city_list = get_cityList()
allCitiesTop3000 = get_allCitiesDiaryTop3000()
pool = Pool(processes)
for city in city_list:
sql = "select city_id,cid from data_feed_click " \
"where cid_type = 'diary' and city_id = '{0}' group by cid " \
"order by max(click_count_choice) desc limit 3000".format(city)
pool.apply_async(pool_method,(city,sql,allCitiesTop3000,))
pool.close()
pool.join()
if __name__ == "__main__":
get_eachCityDiaryTop3000()
start = time.time()
multi_get_eachCityDiaryTop3000()
end = time.time()
print("获取各城市热门日记耗时{}分".format((end-start)/60))
from utils import *
import datetime
import pickle
import time
if __name__ == '__main__':
data = pd.read_csv("../data/test-data/raw-exposure.csv")[["cid", "device_id"]]
data["y"] = 1
test_data = data.tail(1)
ffm = FFMFormatPandas()
data = ffm.fit_transform(data, y='y')
data.to_csv("../data/ffm_data.csv", index=False)
with open("../data/ffm.object", "wb") as f:
pickle.dump(ffm, f)
with open("../data/ffm.object", "rb") as f:
ffm = pickle.load(f)
result = ffm.transform(test_data)
print(result)
data_1 = pd.read_csv("../data/ffm_data.csv", header=None).tail(5)
print(data_1)
if __name__ == '__main__':
df = pd.read_csv("/Users/mac/PycharmProjects/nvwa/ffm-baseline/data/test-data/大数据.csv")
for i in range(500,10000,500):
start = time.time()
ffm = multiFFMFormatPandas()
data = ffm.fit_transform(df, y="y",n=i,processes=3)
end = time.time()
print("分割单位{}耗时{}".format(i,end-start))
# encoding = "utf-8"
import pymysql
import pandas as pd
import numpy as np
import redis
# 从Tidb数据库的表里获取数据,并转化成df格式
def con_sql(sql):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
db.close()
return df
# 把数据写到redis里
# TODO 生产环境的redis地址没有提供,下面的地址是测试环境的,需要改成生产环境地址
def add_data_to_redis(key,val):
r = redis.StrictRedis(host='10.30.50.58', port=6379, db = 12)
r.set(key, val)
# 设置key的过期时间,36小时后过期
r.expire(key,36*60*60)
# ffm 格式转换函数、类
class FFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
last_idx = 0
for col in df.columns:
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
self.feature_index_[col] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None):
self.fit(df, y)
return self.transform(df)
def transform_row_(self, row, t):
ffm = []
if self.y is not None:
ffm.append(str(row.loc[row.index == self.y][0]))
if self.y is None:
ffm.append(str(0))
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
return ' '.join(ffm)
def transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
......@@ -75,7 +75,7 @@ def ffm_transform(data, test_number, validation_number):
print("Start ffm transform")
start = time.time()
ffm_train = multiFFMFormatPandas()
data = ffm_train.fit_transform(data, y='y',n=200000,processes=6)
data = ffm_train.fit_transform(data, y='y',n=200000,processes=8)
with open(DIRECTORY_PATH+"ffm.pkl", "wb") as f:
pickle.dump(ffm_train, f)
......
from processData import *
from diaryTraining import *
from diaryCandidateSet import get_eachCityDiaryTop3000
from diaryCandidateSet import multi_get_eachCityDiaryTop3000
from utils import get_date
......@@ -10,17 +10,20 @@ if __name__ == "__main__":
# while True:
# now = datetime.now()
# if (now.hour == 23) and (now.minute == 30):
start = time.time()
start_train = time.time()
data_start_date, data_end_date, validation_date, test_date = get_date()
data, test_number, validation_number = feature_en(data_start_date, data_end_date,
validation_date,test_date)
ffm_transform(data, test_number, validation_number)
train()
end_train = time.time()
print("训练模型耗时{}分".format((end_train-start_train)/60))
print('---------------prepare candidates--------------')
start = time.time()
multi_get_eachCityDiaryTop3000()
end = time.time()
print("训练模型耗时{}分".format((end-start)/60))
# print('---------------prepare candidates--------------')
# get_eachCityDiaryTop3000()
# print("end")
print("获取各城市热门日记耗时{}分".format((end - start) / 60))
print("end")
......
......@@ -89,7 +89,7 @@ class multiFFMFormatPandas:
return self
def fit_transform(self, df, y=None,n=1000000,processes=6):
def fit_transform(self, df, y=None,n=200000,processes=8):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
......@@ -112,7 +112,7 @@ class multiFFMFormatPandas:
ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
return ' '.join(ffm)
def transform(self, df,n=10000,processes=2):
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment