Commit fc38dc9a authored by 王志伟's avatar 王志伟
parents f5b71cd1 c7799bcc
...@@ -18,9 +18,11 @@ def con_sql(sql): ...@@ -18,9 +18,11 @@ def con_sql(sql):
db.close() db.close()
return result return result
def set_join(lst): def set_join(lst):
# return ','.join([str(i) for i in list(lst)]) r = [str(i) for i in lst.unique().tolist()]
return ','.join([str(i) for i in lst.unique().tolist()]) r =r[:500]
return ','.join(r)
def main(): def main():
...@@ -73,7 +75,7 @@ def main(): ...@@ -73,7 +75,7 @@ def main():
cur = con.cursor() cur = con.cursor()
cur.execute(delete_str) cur.execute(delete_str)
con.commit() con.commit()
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False) df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
except Exception as e: except Exception as e:
print(e) print(e)
......
import pandas as pd
import pymysql
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def cut_map(x):
if 0 < x <= 5:
return 2
elif 5 < x <= 10:
return 3
elif 10 < x <= 15:
return 4
elif 15 < x <= 20:
return 5
elif 20 < x <= 40:
return 6
else:
return 7
def cut():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select cid_id,time from cid_time"
df = con_sql(db, sql)
df = df.rename(columns={0: "cid", 1: "time"})
print(df.shape)
part_1 = df.loc[df["time"] == 0]
part_2 = df.loc[df["time"] != 0]
part_1["time"] = 1
part_2["time"] = part_2["time"].map(cut_map)
merge = part_1.append(part_2)
print(merge.shape)
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
pd.io.sql.to_sql(merge, "cid_time_cut", yconnect, schema='jerry_test', if_exists='replace', index=False)
if __name__ == "__main__":
cut()
import pandas as pd
import pymysql
import datetime
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=60)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,cut.time " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
# print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11: "l2",
12: "device_id", 13: "time"})
print("esmm data ok")
# print(df.head(2)
print("before")
print(df.shape)
print("after")
df = df.drop_duplicates()
df = df.drop_duplicates(["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1","l2", "time", "stat_date"])
print(df.shape)
unique_values = []
features = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"]
for i in features:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
# 下面这行代码是为了区分不同的列中有相同的值
df[i] = df[i] + i
unique_values.extend(list(df[i].unique()))
for i in ["l1","l2"]:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
# l1和l2中的值与top类别是一个类别
df[i] = df[i]+"top"
unique_values.extend(list(df[i].unique()))
print("features:")
print(len(unique_values))
print(df.head(2))
temp = list(range(1,len(unique_values)+1))
value_map = dict(zip(unique_values,temp))
df = df.drop("device_id", axis=1)
train = df
test = df[df["stat_date"] == validate_date+"stat_date"]
for i in ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]:
train[i] = train[i].map(value_map)
test[i] = test[i].map(value_map)
print("train shape")
print(train.shape)
print("test shape")
print(test.shape)
write_csv(train, "tr",100000)
write_csv(test, "va",80000)
return validate_date,value_map
def write_csv(df,name,n):
for i in range(0, df.shape[0], n):
if i == 0:
temp = df.iloc[0:n]
elif i + n > df.shape[0]:
temp = df.iloc[i:]
else:
temp = df.iloc[i:i + n]
temp.to_csv(path + name+ "/{}_{}.csv".format(name,i), index=False)
def get_predict(date,value_map):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,e.cid_id,cut.time " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid where device_id = '358035085192742'"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11:"l2",
12: "device_id", 13: "cid_id", 14: "time"})
df["stat_date"] = date
print("predict shape")
print(df.shape)
df["uid"] = df["device_id"]
df["city"] = df["ucity_id"]
features = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"]
for i in features:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
df[i] = df[i] + i
for i in ["l1","l2"]:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
# l1和l2中的值与top类别是一个类别
df[i] = df[i]+"top"
native_pre = df[df["label"] == 0]
native_pre = native_pre.drop("label", axis=1)
nearby_pre = df[df["label"] == 1]
nearby_pre = nearby_pre.drop("label", axis=1)
for i in ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]:
native_pre[i] = native_pre[i].map(value_map)
# TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
native_pre[i] = native_pre[i].fillna(0)
nearby_pre[i] = nearby_pre[i].map(value_map)
# TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
nearby_pre[i] = nearby_pre[i].fillna(0)
print("native")
print(native_pre.shape)
print(native_pre.head())
native_pre[["uid","city","cid_id"]].to_csv(path+"native.csv",index=False)
write_csv(native_pre, "native",200000)
print("nearby")
print(nearby_pre.shape)
print(nearby_pre.head())
nearby_pre[["uid","city","cid_id"]].to_csv(path+"nearby.csv",index=False)
write_csv(nearby_pre, "nearby", 160000)
if __name__ == '__main__':
path = "/home/gmuser/esmm_data/"
date,value = get_data()
get_predict(date, value)
#! /bin/bash
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/tensnsorflow/es
DATA_PATH=/home/gmuser/esmm_data
echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/*
rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/201*
echo "data"
${PYTHON_PATH} ${MODEL_PATH}/feature.py > ${DATA_PATH}/infer.log
echo "csv to tfrecord"
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/tr/ --output_dir=${DATA_PATH}/tr/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/va/ --output_dir=${DATA_PATH}/va/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/native/ --output_dir=${DATA_PATH}/native/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/nearby/ --output_dir=${DATA_PATH}/nearby/
cat ${DATA_PATH}/tr/*.tfrecord > ${DATA_PATH}/tr/tr.tfrecord
cat ${DATA_PATH}/va/*.tfrecord > ${DATA_PATH}/va/va.tfrecord
cat ${DATA_PATH}/native/*.tfrecord > ${DATA_PATH}/native/native.tfrecord
cat ${DATA_PATH}/nearby/*.tfrecord > ${DATA_PATH}/nearby/nearby.tfrecord
rm ${DATA_PATH}/tr/tr_*
rm ${DATA_PATH}/va/va_*
rm ${DATA_PATH}/native/native_*
rm ${DATA_PATH}/nearby/nearby_*
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=2 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=1460 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
#coding=utf-8
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import MySQLdb
import time
def con_sql(sql):
"""
:type sql : str
:rtype : tuple
"""
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def nearby_set_join(lst):
# return ','.join([str(i) for i in list(lst)])
return ','.join([str(i) for i in lst.unique().tolist()])
def native_set_join(lst):
l = lst.unique().tolist()
d = int(len(l)/2)
if d == 0:
d = 1
r = [str(i) for i in l]
r =r[:d]
return ','.join(r)
def main():
# native queue
df2 = pd.read_csv('/home/gmuser/esmm_data/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
print("native_device_count",df3.shape)
# nearby queue
df2 = pd.read_csv('/home/gmuser/esmm_data/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
df_all['device_id'] = df_all['device_id'].astype(str)
df_all['city_id'] = df_all['city_id'].astype(str)
ctime = int(time.time())
df_all["time"] = ctime
print("union_device_count",df_all.shape)
host='10.66.157.22'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
db='jerry_test'
charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False)
except Exception as e:
print(e)
if __name__ == '__main__':
main()
\ No newline at end of file
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import sys
import os
import glob
import tensorflow as tf
import numpy as np
import re
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int)))
})
example = tf.train.Example(features = features)
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
#coding=utf-8
#from __future__ import absolute_import
#from __future__ import division
#from __future__ import print_function
#import argparse
import shutil
#import sys
import os
import json
import glob
from datetime import date, timedelta
from time import time
import random
import tensorflow as tf
#################### CMD Arguments ####################
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_integer("dist_mode", 0, "distribuion mode {0-loacal, 1-single_dist, 2-multi_dist}")
tf.app.flags.DEFINE_string("ps_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", '', "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("num_threads", 16, "Number of threads")
tf.app.flags.DEFINE_integer("feature_size", 0, "Number of features")
tf.app.flags.DEFINE_integer("field_size", 0, "Number of common fields")
tf.app.flags.DEFINE_integer("embedding_size", 32, "Embedding size")
tf.app.flags.DEFINE_integer("num_epochs", 10, "Number of epochs")
tf.app.flags.DEFINE_integer("batch_size", 64, "Number of batch size")
tf.app.flags.DEFINE_integer("log_steps", 1000, "save summary every steps")
tf.app.flags.DEFINE_float("learning_rate", 0.0005, "learning rate")
tf.app.flags.DEFINE_float("l2_reg", 0.0001, "L2 regularization")
tf.app.flags.DEFINE_string("loss_type", 'log_loss', "loss type {square_loss, log_loss}")
tf.app.flags.DEFINE_float("ctr_task_wgt", 0.5, "loss weight of ctr task")
tf.app.flags.DEFINE_string("optimizer", 'Adam', "optimizer type {Adam, Adagrad, GD, Momentum}")
tf.app.flags.DEFINE_string("deep_layers", '256,128,64', "deep layers")
tf.app.flags.DEFINE_string("dropout", '0.5,0.5,0.5', "dropout rate")
tf.app.flags.DEFINE_boolean("batch_norm", False, "perform batch normaization (True or False)")
tf.app.flags.DEFINE_float("batch_norm_decay", 0.9, "decay for the moving average(recommend trying decay=0.9)")
tf.app.flags.DEFINE_string("data_dir", '', "data dir")
tf.app.flags.DEFINE_string("dt_dir", '', "data dt partition")
tf.app.flags.DEFINE_string("model_dir", '', "model check point dir")
tf.app.flags.DEFINE_string("servable_model_dir", '', "export servable model for TensorFlow Serving")
tf.app.flags.DEFINE_string("task_type", 'train', "task type {train, infer, eval, export}")
tf.app.flags.DEFINE_boolean("clear_existing_model", False, "clear existing model or not")
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
def _parse_fn(record):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([11], tf.int64)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
field_size = params["field_size"]
feature_size = params["feature_size"]
embedding_size = params["embedding_size"]
l2_reg = params["l2_reg"]
learning_rate = params["learning_rate"]
#optimizer = params["optimizer"]
layers = list(map(int, params["deep_layers"].split(',')))
dropout = list(map(float, params["dropout"].split(',')))
ctr_task_wgt = params["ctr_task_wgt"]
common_dims = field_size*embedding_size
#------bulid weights------
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
feat_ids = features['ids']
if FLAGS.task_type != "infer":
y = labels['y']
z = labels['z']
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_cvr = x_concat
for i in range(len(layers)):
x_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_mlp%d' % i)
if FLAGS.batch_norm:
x_cvr = batch_norm_layer(x_cvr, train_phase=train_phase, scope_bn='cvr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_cvr = tf.nn.dropout(x_cvr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_out')
y_cvr = tf.reshape(y_cvr,shape=[-1])
with tf.name_scope("CTR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_ctr = x_concat
for i in range(len(layers)):
x_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_mlp%d' % i)
if FLAGS.batch_norm:
x_ctr = batch_norm_layer(x_ctr, train_phase=train_phase, scope_bn='ctr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_ctr = tf.nn.dropout(x_ctr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_out')
y_ctr = tf.reshape(y_ctr,shape=[-1])
with tf.variable_scope("MTL-Layer"):
pctr = tf.sigmoid(y_ctr)
pcvr = tf.sigmoid(y_cvr)
pctcvr = pctr*pcvr
predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
export_outputs=export_outputs)
if FLAGS.task_type != "infer":
#------bulid loss------
ctr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctr, labels=y))
#cvr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctcvr, labels=z))
cvr_loss = tf.reduce_mean(tf.losses.log_loss(predictions=pctcvr, labels=z))
loss = ctr_task_wgt * ctr_loss + (1 -ctr_task_wgt) * cvr_loss + l2_reg * tf.nn.l2_loss(Feat_Emb)
tf.summary.scalar('ctr_loss', ctr_loss)
tf.summary.scalar('cvr_loss', cvr_loss)
# Provide an estimator spec for `ModeKeys.EVAL`
eval_metric_ops = {
"CTR_AUC": tf.metrics.auc(y, pctr),
#"CTR_F1": tf.contrib.metrics.f1_score(y,pctr),
#"CTR_Precision": tf.metrics.precision(y,pctr),
#"CTR_Recall": tf.metrics.recall(y,pctr),
"CVR_AUC": tf.metrics.auc(z, pcvr),
"CTCVR_AUC": tf.metrics.auc(z, pctcvr)
}
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
eval_metric_ops=eval_metric_ops)
#------bulid optimizer------
if FLAGS.optimizer == 'Adam':
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8)
elif FLAGS.optimizer == 'Adagrad':
optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate, initial_accumulator_value=1e-8)
elif FLAGS.optimizer == 'Momentum':
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.95)
elif FLAGS.optimizer == 'ftrl':
optimizer = tf.train.FtrlOptimizer(learning_rate)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
# Provide an estimator spec for `ModeKeys.TRAIN` modes
if mode == tf.estimator.ModeKeys.TRAIN:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
train_op=train_op)
def batch_norm_layer(x, train_phase, scope_bn):
bn_train = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=True, reuse=None, scope=scope_bn)
bn_infer = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=False, reuse=True, scope=scope_bn)
z = tf.cond(tf.cast(train_phase, tf.bool), lambda: bn_train, lambda: bn_infer)
return z
def set_dist_env():
if FLAGS.dist_mode == 1: # 本地分布式测试模式1 chief, 1 ps, 1 evaluator
ps_hosts = FLAGS.ps_hosts.split(',')
chief_hosts = FLAGS.chief_hosts.split(',')
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# 无worker参数
tf_config = {
'cluster': {'chief': chief_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
elif FLAGS.dist_mode == 2: # 集群分布式模式
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
chief_hosts = worker_hosts[0:1] # get first worker as chief
worker_hosts = worker_hosts[2:] # the rest as worker
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('worker_host', worker_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# use #worker=0 as chief
if job_name == "worker" and task_index == 0:
job_name = "chief"
# use #worker=1 as evaluator
if job_name == "worker" and task_index == 1:
job_name = 'evaluator'
task_index = 0
# the others as worker
if job_name == "worker" and task_index > 1:
task_index -= 2
tf_config = {
'cluster': {'chief': chief_hosts, 'worker': worker_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
def main(_):
#------check Arguments------
if FLAGS.dt_dir == "":
FLAGS.dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
FLAGS.model_dir = FLAGS.model_dir + FLAGS.dt_dir
#FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir
print('task_type ', FLAGS.task_type)
print('model_dir ', FLAGS.model_dir)
print('data_dir ', FLAGS.data_dir)
print('dt_dir ', FLAGS.dt_dir)
print('num_epochs ', FLAGS.num_epochs)
print('feature_size ', FLAGS.feature_size)
print('field_size ', FLAGS.field_size)
print('embedding_size ', FLAGS.embedding_size)
print('batch_size ', FLAGS.batch_size)
print('deep_layers ', FLAGS.deep_layers)
print('dropout ', FLAGS.dropout)
print('loss_type ', FLAGS.loss_type)
print('optimizer ', FLAGS.optimizer)
print('learning_rate ', FLAGS.learning_rate)
print('l2_reg ', FLAGS.l2_reg)
print('ctr_task_wgt ', FLAGS.ctr_task_wgt)
#------init Envs------
tr_files = glob.glob("%s/tr/*tfrecord" % FLAGS.data_dir)
random.shuffle(tr_files)
print("tr_files:", tr_files)
va_files = glob.glob("%s/va/*tfrecord" % FLAGS.data_dir)
print("va_files:", va_files)
te_files = glob.glob("%s/*tfrecord" % FLAGS.data_dir)
print("te_files:", te_files)
if FLAGS.clear_existing_model:
try:
shutil.rmtree(FLAGS.model_dir)
except Exception as e:
print(e, "at clear_existing_model")
else:
print("existing model cleaned at %s" % FLAGS.model_dir)
set_dist_env()
#------bulid Tasks------
model_params = {
"field_size": FLAGS.field_size,
"feature_size": FLAGS.feature_size,
"embedding_size": FLAGS.embedding_size,
"learning_rate": FLAGS.learning_rate,
"l2_reg": FLAGS.l2_reg,
"deep_layers": FLAGS.deep_layers,
"dropout": FLAGS.dropout,
"ctr_task_wgt":FLAGS.ctr_task_wgt
}
config = tf.estimator.RunConfig().replace(session_config = tf.ConfigProto(device_count={'GPU':0, 'CPU':FLAGS.num_threads}),
log_step_count_steps=FLAGS.log_steps, save_summary_steps=FLAGS.log_steps)
Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=FLAGS.model_dir, params=model_params, config=config)
if FLAGS.task_type == 'train':
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(tr_files, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size))
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size), steps=None, start_delay_secs=1000, throttle_secs=1200)
result = tf.estimator.train_and_evaluate(Estimator, train_spec, eval_spec)
for key,value in sorted(result[0].items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'eval':
result = Estimator.evaluate(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size))
for key,value in sorted(result.items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'infer':
preds = Estimator.predict(input_fn=lambda: input_fn(te_files, num_epochs=1, batch_size=FLAGS.batch_size), predict_keys=["pctcvr","pctr","pcvr"])
with open(FLAGS.data_dir+"/pred.txt", "w") as fo:
print("-"*100)
with open(FLAGS.data_dir + "/pred.txt", "w") as fo:
for prob in preds:
fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
import pandas as pd
import pymysql
import datetime
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def multi():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select diary_id,level2_ids from diary_feat"
df = con_sql(db, sql).dropna()
print(df.shape)
df = df.rename(columns={0: "cid", 1: "level"})
df["l1"] = "lost"
df["l2"] = "lost"
df["l3"] = "lost"
for i in list(df["level"].unique()):
l = [int(j) for j in i.split(";")]
l = sorted(l)
if len(l) >= 3:
df.loc[df["level"] == i, ["l1"]] = l[0]
df.loc[df["level"] == i, ["l2"]] = l[1]
df.loc[df["level"] == i, ["l3"]] = l[2]
elif len(l) == 2:
df.loc[df["level"] == i, ["l1"]] = l[0]
df.loc[df["level"] == i, ["l2"]] = l[1]
elif len(l) == 1:
df.loc[df["level"] == i, ["l1"]] = l[0]
df = df.drop("level",axis=1)
print(df.head())
# a = list(df["l1"].unique())
# b = list(df["l2"].unique())
# c = list(df["l3"].unique())
# print(len(a))
# print(a)
# print(len(b))
# print(b)
# print(len(c))
# print(c)
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
n = 200000
for i in range(0, df.shape[0], n):
if i == 0:
temp = df.iloc[0:n]
elif i + n > df.shape[0]:
temp = df.iloc[i:]
else:
temp = df.iloc[i:i + n]
pd.io.sql.to_sql(temp, "cid_level2", yconnect, schema='jerry_test', if_exists='append', index=False)
print("insert done")
if __name__ == "__main__":
multi()
\ No newline at end of file
import pandas as pd
import pymysql
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def exp():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select native_queue from esmm_device_diary_queue where device_id = '358035085192742'"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchone()[0]
native = tuple(result.split(","))
print("total")
print(len(native))
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select diary_id,level1_ids,level2_ids,level3_ids from diary_feat where diary_id in {}".format(native)
df = con_sql(db,sql)
n = df.shape[0]
one = df[1].unique()
one_map = {}
for i in one:
one_map[i] = df.loc[df[1]==i].shape[0]/n
print(sorted(one_map.items(),key = lambda x:x[1]))
two = df[2].unique()
two_map = {}
print("分界线")
for i in two:
two_map[i] = df.loc[df[2] == i].shape[0] / n
print(sorted(two_map.items(), key=lambda x: x[1]))
def click():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select d.cid_id,f.level1_ids,f.level2_ids from data_feed_click d left join diary_feat f " \
"on d.cid_id = f.diary_id where d.device_id = '358035085192742' " \
"and (d.cid_type = 'diary' or d.cid_type = 'diary_video') and d.stat_date > '2018-12-20'"
df = con_sql(db, sql)
n = df.shape[0]
print(n)
one = df[1].unique()
one_map = {}
for i in one:
one_map[i] = df.loc[df[1] == i].shape[0] / n
print(sorted(one_map.items(), key=lambda x: x[1],reverse=True))
two = df[2].unique()
two_map = {}
print("分界线")
for i in two:
two_map[i] = df.loc[df[2] == i].shape[0] / n
print(sorted(two_map.items(), key=lambda x: x[1],reverse=True))
def get_cid():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select cid_id from esmm_train_data where device_id = '358035085192742' " \
"and stat_date >= '2018-12-03'"
df = con_sql(db, sql)[0].values.tolist()
print(",".join(df))
if __name__ == "__main__":
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
# 读取葡萄酒数据集
data = pd.read_csv("G:/dataset/wine.csv")
# 获取第二列Alcohol
x = data["Alcohol"]
# 获取数据的基本情况
print(x.describe())
minMax = MinMaxScaler()
# 将数据进行归一化
x_std = minMax.fit_transform(x)
pd.DataFrame()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment