Commit 4ad04627 authored by 高雅喆's avatar 高雅喆

use python ffm_encoder

parent 7e8d57c2
#coding=utf-8
import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
import datetime
import time
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
# def test():
# sql = "select max(update_time) from ffm_diary_queue"
# db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='eagle')
# cursor = db.cursor()
# cursor.execute(sql)
# result = cursor.fetchone()[0]
# db.close()
# print(result)
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
for col in df.columns:
self.feature_index_[col] = 1
last_idx = 1
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=50000,processes=4):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
elif col_type.kind != 'O':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
result = ' '.join(ffm)
if self.y is not None:
result = str(row.loc[row.index == self.y][0]) + "," + result
if self.y is None:
result = str(0) + "," + result
return result
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes)
print("总进度: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.loc[x:x + step])
x = x + step + 1
else:
data_list.append(data.loc[x:data.__len__()])
break
return data_list
# 原生转化方法,不需要多进程
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=15)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.device_id,e.y,e.z,e.stat_date,e.ucity_id,e.cid_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel," \
"home.jingxuan,home.zhibo,home.nose,home.eyes,home.weizheng,home.teeth,home.lunkuo," \
"home.meifu,home.xizhi,home.zhifang,home.longxiong,home.simi,home.maofa,home.gongli,home.korea " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join home_tab_click home on e.device_id = home.device_id " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id", 1: "y", 2: "z", 3: "stat_date", 4: "ucity_id", 5: "cid_id",
6: "clevel1_id", 7: "ccity_name"})
print("esmm data ok")
print(df.head(2))
ucity_id = list(set(df["ucity_id"].values.tolist()))
cid = list(set(df["cid_id"].values.tolist()))
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","device_id"], axis=1).fillna(0.0)
print(df.head(2))
print("fields:{}".format(df.shape[1]-1))
print("features:{}".format(len(cid)))
return df,validate_date,ucity_id,cid
def transform(a,validate_date):
model = multiFFMFormatPandas()
df = model.fit_transform(a, y="y", n=160000, processes=26)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["cid"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0,"seq"], axis=1)
print(df.head(2))
train = df[df["stat_date"] != validate_date]
train = train.drop("stat_date",axis=1)
test = df[df["stat_date"] == validate_date]
test = test.drop("stat_date",axis=1)
print("train shape")
print(train.shape)
train.to_csv(path + "tr.csv", sep="\t", index=False)
test.to_csv(path + "te.csv", sep="\t", index=False)
return model
def get_predict_set(ucity_id, cid,model):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.device_id,e.y,e.z,e.stat_date,e.ucity_id,e.cid_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel," \
"home.jingxuan,home.zhibo,home.nose,home.eyes,home.weizheng,home.teeth,home.lunkuo," \
"home.meifu,home.xizhi,home.zhifang,home.longxiong,home.simi,home.maofa,home.gongli,home.korea,e.label " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join home_tab_click home on e.device_id = home.device_id"
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id", 1: "y", 2: "z", 3: "stat_date", 4: "ucity_id", 5: "cid_id",
6: "clevel1_id", 7: "ccity_name",26:"label"})
print("before filter:")
print(df.shape)
df = df[df["cid_id"].isin(cid)]
print("after cid filter:")
print(df.shape)
df = df[df["ucity_id"].isin(ucity_id)]
print("after ucity filter:")
print(df.shape)
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["label"] = df["label"].astype("str")
df["y"] = df["label"].str.cat(
[df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
df = df.drop(["z","label","device_id"], axis=1).fillna(0.0)
print(df.head(2))
df = model.transform(df,n=160000, processes=22)
df = pd.DataFrame(df)
df["label"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["cid"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0, "seq"], axis=1)
print(df.head())
native_pre = df[df["label"] == "0"]
native_pre = native_pre.drop("label", axis=1)
native_pre.to_csv(path+"native.csv",sep="\t",index=False)
# print("native_pre shape")
# print(native_pre.shape)
nearby_pre = df[df["label"] == "1"]
nearby_pre = nearby_pre.drop("label", axis=1)
nearby_pre.to_csv(path + "nearby.csv", sep="\t", index=False)
# print("nearby_pre shape")
# print(nearby_pre.shape)
if __name__ == "__main__":
path = "/home/gaoyazhe/data/"
a = time.time()
df, validate_date, ucity_id, cid = get_data()
model = transform(df, validate_date)
get_predict_set(ucity_id, cid,model)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
\ No newline at end of file
#!/usr/bin/env python
#coding=utf-8
from __future__ import absolute_import
......
#!/usr/bin/env python
#coding=utf-8
#from __future__ import absolute_import
......
# -*- coding: utf-8 -*-
#coding=utf-8
import smtplib
from email.mime.text import MIMEText
......
#coding=utf-8
from sqlalchemy import create_engine
import pandas as pd
import pymysql
......
......@@ -15,11 +15,8 @@ rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
echo "mysql to csv"
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_train" > ${DATA_PATH}/tr.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_cv" > ${DATA_PATH}/va.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_native" > ${DATA_PATH}/native.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_nearby" > ${DATA_PATH}/nearby.csv
echo "data2ffm"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/data2ffm.py
echo "split data"
split -l $((`wc -l < ${DATA_PATH}/tr.csv`/15)) ${DATA_PATH}/tr.csv -d -a 4 ${DATA_PATH}/tr/tr_ --additional-suffix=.csv
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment