Commit 1a160230 authored by 张彦钊's avatar 张彦钊

esmm 新的特征工程、增加新的特征

parent 9ca8b104
import pandas as pd
import pymysql
import datetime
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=60)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,cut.time " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
# print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11: "l2",
12: "device_id", 13: "time"})
print("esmm data ok")
# print(df.head(2)
print("before")
print(df.shape)
print("after")
df = df.drop_duplicates()
df = df.drop_duplicates(["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1","l2", "time", "stat_date"])
print(df.shape)
unique_values = []
features = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"]
for i in features:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
# 下面这行代码是为了区分不同的列中有相同的值
df[i] = df[i] + i
unique_values.extend(list(df[i].unique()))
for i in ["l1","l2"]:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
# l1和l2中的值与top类别是一个类别
df[i] = df[i]+"top"
unique_values.extend(list(df[i].unique()))
print("features:")
print(len(unique_values))
print(df.head(2))
temp = list(range(1,len(unique_values)+1))
value_map = dict(zip(unique_values,temp))
df = df.drop("device_id", axis=1)
train = df
test = df[df["stat_date"] == validate_date+"stat_date"]
for i in ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]:
train[i] = train[i].map(value_map)
test[i] = test[i].map(value_map)
print("train shape")
print(train.shape)
print("test shape")
print(test.shape)
write_csv(train, "tr",100000)
write_csv(test, "va",80000)
return validate_date,value_map
def write_csv(df,name,n):
for i in range(0, df.shape[0], n):
if i == 0:
temp = df.iloc[0:n]
elif i + n > df.shape[0]:
temp = df.iloc[i:]
else:
temp = df.iloc[i:i + n]
temp.to_csv(path + name+ "/{}_{}.csv".format(name,i), index=False)
def get_predict(date,value_map):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,e.cid_id,cut.time " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid where device_id = '358035085192742'"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11:"l2",
12: "device_id", 13: "cid_id", 14: "time"})
df["stat_date"] = date
print("predict shape")
print(df.shape)
df["uid"] = df["device_id"]
df["city"] = df["ucity_id"]
features = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"]
for i in features:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
df[i] = df[i] + i
for i in ["l1","l2"]:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
# l1和l2中的值与top类别是一个类别
df[i] = df[i]+"top"
native_pre = df[df["label"] == 0]
native_pre = native_pre.drop("label", axis=1)
nearby_pre = df[df["label"] == 1]
nearby_pre = nearby_pre.drop("label", axis=1)
for i in ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]:
native_pre[i] = native_pre[i].map(value_map)
# TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
native_pre[i] = native_pre[i].fillna(0)
nearby_pre[i] = nearby_pre[i].map(value_map)
# TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
nearby_pre[i] = nearby_pre[i].fillna(0)
print("native")
print(native_pre.shape)
print(native_pre.head())
native_pre[["uid","city","cid_id"]].to_csv(path+"native.csv",index=False)
write_csv(native_pre, "native",200000)
print("nearby")
print(nearby_pre.shape)
print(nearby_pre.head())
nearby_pre[["uid","city","cid_id"]].to_csv(path+"nearby.csv",index=False)
write_csv(nearby_pre, "nearby", 160000)
if __name__ == '__main__':
path = "/home/gmuser/esmm_data/"
date,value = get_data()
get_predict(date, value)
#coding=utf-8
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
import datetime
my_sender='gaoyazhe@igengmei.com'
my_pass = 'VCrKTui99a7ALhiK'
my_user1='gaoyazhe@igengmei.com'
my_user2='zhangyanzhao@igengmei.com'
def mail():
ret=True
try:
with open('/home/gmuser/esmm_data/submit.log') as f:
stat_data = f.read()
msg=MIMEText(stat_data,'plain','utf-8')
msg['From']=formataddr(["高雅喆",my_sender])
msg['To']=my_user1 + ',' + my_user2
msg['Subject']= str(datetime.date.today())+"-esmm多目标模型训练指标统计"
server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
server.login(my_sender, my_pass)
server.sendmail(my_sender,[my_user1,my_user2],msg.as_string())
server.quit()
except Exception:
ret=False
return ret
ret=mail()
if ret:
print("邮件发送成功")
else:
print("邮件发送失败")
\ No newline at end of file
#! /bin/bash
cd /srv/apps/ffm-baseline/eda/esmm
git checkout master
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm/Model_pipline
DATA_PATH=/home/gmuser/esmm_data
echo "start "
notExce(){
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
}
echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/*
rm ${DATA_PATH}/va/*
......@@ -21,25 +11,14 @@ rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/201*
echo "data2ffm"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/data2ffm.py > ${DATA_PATH}/infer.log
all_sample=$((`cat ${DATA_PATH}/tr.csv | awk -F '\t' '{print$5}' | awk -F ',' '{print$2$3$4}' | sort | uniq | wc -l`))
uniq_feat=$((`cat ${DATA_PATH}/tr.csv | awk -F '\t' '{print$5}' | awk -F ',' '{print$4}' | sort | uniq -u | wc -l`))
repe_feat=$((all_sample-uniq_feat))
echo "Bayes Error Rate": $((repe_feat*100/all_sample))%
echo "split data"
split -l $((`wc -l < ${DATA_PATH}/tr.csv`/15)) ${DATA_PATH}/tr.csv -d -a 4 ${DATA_PATH}/tr/tr_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/va.csv`/5)) ${DATA_PATH}/va.csv -d -a 4 ${DATA_PATH}/va/va_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/native.csv`/15)) ${DATA_PATH}/native.csv -d -a 4 ${DATA_PATH}/native/native_ --additional-suffix=.csv
split -l $((`wc -l < ${DATA_PATH}/nearby.csv`/5)) ${DATA_PATH}/nearby.csv -d -a 4 ${DATA_PATH}/nearby/nearby_ --additional-suffix=.csv
echo "data"
${PYTHON_PATH} ${MODEL_PATH}/feature.py > ${DATA_PATH}/infer.log
echo "csv to tfrecord"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/tr/ --output_dir=${DATA_PATH}/tr/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/va/ --output_dir=${DATA_PATH}/va/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/native/ --output_dir=${DATA_PATH}/native/
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/get_tfrecord.py --input_dir=${DATA_PATH}/nearby/ --output_dir=${DATA_PATH}/nearby/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/tr/ --output_dir=${DATA_PATH}/tr/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/va/ --output_dir=${DATA_PATH}/va/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/native/ --output_dir=${DATA_PATH}/native/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/nearby/ --output_dir=${DATA_PATH}/nearby/
cat ${DATA_PATH}/tr/*.tfrecord > ${DATA_PATH}/tr/tr.tfrecord
cat ${DATA_PATH}/va/*.tfrecord > ${DATA_PATH}/va/va.tfrecord
......@@ -53,17 +32,15 @@ rm ${DATA_PATH}/nearby/nearby_*
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=2 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/sort_and_2sql.py
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/send_mail.py
\ No newline at end of file
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
......@@ -3,7 +3,6 @@
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import MySQLdb
import time
def con_sql(sql):
......@@ -19,31 +18,41 @@ def con_sql(sql):
return result
def set_join(lst):
r = [str(i) for i in lst.unique().tolist()]
r =r[:500]
def nearby_set_join(lst):
# return ','.join([str(i) for i in list(lst)])
return ','.join([str(i) for i in lst.unique().tolist()])
def native_set_join(lst):
l = lst.unique().tolist()
d = int(len(l)/2)
if d == 0:
d = 1
r = [str(i) for i in l]
r =r[:d]
return ','.join(r)
def main():
# native queue
df2 = pd.read_csv('/home/gmuser/esmm_data/native.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2 = pd.read_csv('/home/gmuser/esmm_data/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
print("native_device_count",df3.shape)
# nearby queue
df2 = pd.read_csv('/home/gmuser/esmm_data/nearby.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2 = pd.read_csv('/home/gmuser/esmm_data/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
......
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import sys
import os
import glob
import tensorflow as tf
import numpy as np
import re
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int)))
})
example = tf.train.Example(features = features)
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
......@@ -53,9 +53,9 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"feat_ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64)
#"feat_vals": tf.FixedLenFeature([None], tf.float32),
"ids": tf.FixedLenFeature([11], tf.int64)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
......@@ -98,15 +98,7 @@ def model_fn(features, labels, mode, params):
#------bulid weights------
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
#------build feaure-------
#{U-A-X-C不需要特殊处理的特征}
feat_ids = features['feat_ids']
#feat_vals = features['feat_vals']
#{User multi-hot}
#{Ad}
#{X multi-hot}
#x_intids = features['x_intids']
#x_intvals = features['x_intvals']
feat_ids = features['ids']
if FLAGS.task_type != "infer":
y = labels['y']
......@@ -114,10 +106,9 @@ def model_fn(features, labels, mode, params):
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
common_embs = tf.nn.embedding_lookup(Feat_Emb, feat_ids) # None * F' * K
#common_embs = tf.multiply(common_embs, feat_vals)
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
x_concat = tf.concat([tf.reshape(common_embs,shape=[-1, common_dims])],axis=1) # None * (F * K)
x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
......@@ -348,20 +339,6 @@ def main(_):
fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
#feature_spec = tf.feature_column.make_parse_example_spec(feature_columns)
#feature_spec = {
# 'feat_ids': tf.FixedLenFeature(dtype=tf.int64, shape=[None, FLAGS.field_size]),
# 'feat_vals': tf.FixedLenFeature(dtype=tf.float32, shape=[None, FLAGS.field_size])
#}
#serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
#feature_spec = {
# 'feat_ids': tf.placeholder(dtype=tf.int64, shape=[None, FLAGS.field_size], name='feat_ids'),
# 'feat_vals': tf.placeholder(dtype=tf.float32, shape=[None, FLAGS.field_size], name='feat_vals')
#}
#serving_input_receiver_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feature_spec)
#Estimator.export_savedmodel(FLAGS.servable_model_dir, serving_input_receiver_fn)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment