Commit 34e547b3 authored by 高雅喆's avatar 高雅喆

add esmm model

parent 8d1dd5a7
*.class
*.log
Model_pipline/model_ckpt/*
data/*
__pycache__/
*.py[cod]
metastore_db/*
*.idea
\ No newline at end of file
#!/usr/bin/env python
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import os
import glob
import tensorflow as tf
import numpy as np
import re
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
#保证顺序以及字段数量
#User_Fileds = set(['101','109_14','110_14','127_14','150_14','121','122','124','125','126','127','128','129'])
#Ad_Fileds = set(['205','206','207','210','216'])
#Context_Fileds = set(['508','509','702','853','301'])
#Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11','12':'12','13':'13','14':'14','15':'15','16':'16','17':'17','18':'18','19':'19','20':'20','21':'21','22':'22','23':'23','24':'24','25':'25','26':'26','27':'27','28':'28','29':'29','30':'30'}
Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11'}
UMH_Fileds = {'109_14':('u_cat','12'),'110_14':('u_shop','13'),'127_14':('u_brand','14'),'150_14':('u_int','15')} #user multi-hot feature
Ad_Fileds = {'206':('a_cat','16'),'207':('a_shop','17'),'210':('a_int','18'),'216':('a_brand','19')} #ad feature for DIN
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
with open(in_file) as fi:
for line in fi:
line = line.strip().split('\t')[-1]
fields = line.strip().split(',')
if len(fields) != 4:
continue
#1 label
y = [float(fields[1])]
z = [float(fields[2])]
feature = {
"y": tf.train.Feature(float_list = tf.train.FloatList(value=y)),
"z": tf.train.Feature(float_list = tf.train.FloatList(value=z))
}
splits = re.split('[ :]', fields[3])
ffv = np.reshape(splits,(-1,3))
#common_mask = np.array([v in Common_Fileds for v in ffv[:,0]])
#af_mask = np.array([v in Ad_Fileds for v in ffv[:,0]])
#cf_mask = np.array([v in Context_Fileds for v in ffv[:,0]])
#2 不需要特殊处理的特征
feat_ids = np.array([])
#feat_vals = np.array([])
for f, def_id in Common_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = np.append(feat_ids, ffv[mask,1])
#np.append(feat_vals,ffv[mask,2].astype(np.float))
else:
feat_ids = np.append(feat_ids, def_id)
#np.append(feat_vals,1.0)
feature.update({"feat_ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int)))})
#"feat_vals": tf.train.Feature(float_list=tf.train.FloatList(value=feat_vals))})
#3 特殊字段单独处理
for f, (fname, def_id) in UMH_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = ffv[mask,1]
feat_vals= ffv[mask,2]
else:
feat_ids = np.array([def_id])
feat_vals = np.array([1.0])
feature.update({fname+"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int))),
fname+"vals": tf.train.Feature(float_list=tf.train.FloatList(value=feat_vals.astype(np.float)))})
for f, (fname, def_id) in Ad_Fileds.items():
if f in ffv[:,0]:
mask = np.array(f == ffv[:,0])
feat_ids = ffv[mask,1]
else:
feat_ids = np.array([def_id])
feature.update({fname+"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feat_ids.astype(np.int)))})
# serialized to Example
example = tf.train.Example(features = tf.train.Features(feature = feature))
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
#num_lines += 1
#if num_lines % 10000 == 0:
# print("Process %d" % num_lines)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
#!/usr/bin/env python
#coding=utf-8
#from __future__ import absolute_import
#from __future__ import division
#from __future__ import print_function
#import argparse
import shutil
#import sys
import os
import json
import glob
from datetime import date, timedelta
from time import time
import random
import tensorflow as tf
#################### CMD Arguments ####################
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_integer("dist_mode", 0, "distribuion mode {0-loacal, 1-single_dist, 2-multi_dist}")
tf.app.flags.DEFINE_string("ps_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", '', "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("num_threads", 16, "Number of threads")
tf.app.flags.DEFINE_integer("feature_size", 0, "Number of features")
tf.app.flags.DEFINE_integer("field_size", 0, "Number of common fields")
tf.app.flags.DEFINE_integer("embedding_size", 32, "Embedding size")
tf.app.flags.DEFINE_integer("num_epochs", 10, "Number of epochs")
tf.app.flags.DEFINE_integer("batch_size", 64, "Number of batch size")
tf.app.flags.DEFINE_integer("log_steps", 1000, "save summary every steps")
tf.app.flags.DEFINE_float("learning_rate", 0.0005, "learning rate")
tf.app.flags.DEFINE_float("l2_reg", 0.0001, "L2 regularization")
tf.app.flags.DEFINE_string("loss_type", 'log_loss', "loss type {square_loss, log_loss}")
tf.app.flags.DEFINE_float("ctr_task_wgt", 0.5, "loss weight of ctr task")
tf.app.flags.DEFINE_string("optimizer", 'Adam', "optimizer type {Adam, Adagrad, GD, Momentum}")
tf.app.flags.DEFINE_string("deep_layers", '256,128,64', "deep layers")
tf.app.flags.DEFINE_string("dropout", '0.5,0.5,0.5', "dropout rate")
tf.app.flags.DEFINE_boolean("batch_norm", False, "perform batch normaization (True or False)")
tf.app.flags.DEFINE_float("batch_norm_decay", 0.9, "decay for the moving average(recommend trying decay=0.9)")
tf.app.flags.DEFINE_string("data_dir", '', "data dir")
tf.app.flags.DEFINE_string("dt_dir", '', "data dt partition")
tf.app.flags.DEFINE_string("model_dir", '', "model check point dir")
tf.app.flags.DEFINE_string("servable_model_dir", '', "export servable model for TensorFlow Serving")
tf.app.flags.DEFINE_string("task_type", 'train', "task type {train, infer, eval, export}")
tf.app.flags.DEFINE_boolean("clear_existing_model", False, "clear existing model or not")
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
def _parse_fn(record):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"feat_ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64)
#"feat_vals": tf.FixedLenFeature([None], tf.float32),
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
field_size = params["field_size"]
feature_size = params["feature_size"]
embedding_size = params["embedding_size"]
l2_reg = params["l2_reg"]
learning_rate = params["learning_rate"]
#optimizer = params["optimizer"]
layers = list(map(int, params["deep_layers"].split(',')))
dropout = list(map(float, params["dropout"].split(',')))
ctr_task_wgt = params["ctr_task_wgt"]
common_dims = field_size*embedding_size
#------bulid weights------
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
#------build feaure-------
#{U-A-X-C不需要特殊处理的特征}
feat_ids = features['feat_ids']
#feat_vals = features['feat_vals']
#{User multi-hot}
#{Ad}
#{X multi-hot}
#x_intids = features['x_intids']
#x_intvals = features['x_intvals']
if FLAGS.task_type != "infer":
y = labels['y']
z = labels['z']
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
common_embs = tf.nn.embedding_lookup(Feat_Emb, feat_ids) # None * F' * K
#common_embs = tf.multiply(common_embs, feat_vals)
x_concat = tf.concat([tf.reshape(common_embs,shape=[-1, common_dims])],axis=1) # None * (F * K)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_cvr = x_concat
for i in range(len(layers)):
x_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_mlp%d' % i)
if FLAGS.batch_norm:
x_cvr = batch_norm_layer(x_cvr, train_phase=train_phase, scope_bn='cvr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_cvr = tf.nn.dropout(x_cvr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_out')
y_cvr = tf.reshape(y_cvr,shape=[-1])
with tf.name_scope("CTR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_ctr = x_concat
for i in range(len(layers)):
x_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_mlp%d' % i)
if FLAGS.batch_norm:
x_ctr = batch_norm_layer(x_ctr, train_phase=train_phase, scope_bn='ctr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_ctr = tf.nn.dropout(x_ctr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_out')
y_ctr = tf.reshape(y_ctr,shape=[-1])
with tf.variable_scope("MTL-Layer"):
pctr = tf.sigmoid(y_ctr)
pcvr = tf.sigmoid(y_cvr)
pctcvr = pctr*pcvr
predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
export_outputs=export_outputs)
if FLAGS.task_type != "infer":
#------bulid loss------
ctr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctr, labels=y))
#cvr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctcvr, labels=z))
cvr_loss = tf.reduce_mean(tf.losses.log_loss(predictions=pctcvr, labels=z))
loss = ctr_task_wgt * ctr_loss + (1 -ctr_task_wgt) * cvr_loss + l2_reg * tf.nn.l2_loss(Feat_Emb)
tf.summary.scalar('ctr_loss', ctr_loss)
tf.summary.scalar('cvr_loss', cvr_loss)
# Provide an estimator spec for `ModeKeys.EVAL`
eval_metric_ops = {
"CTR_AUC": tf.metrics.auc(y, pctr),
#"CTR_F1": tf.contrib.metrics.f1_score(y,pctr),
#"CTR_Precision": tf.metrics.precision(y,pctr),
#"CTR_Recall": tf.metrics.recall(y,pctr),
"CVR_AUC": tf.metrics.auc(z, pcvr),
"CTCVR_AUC": tf.metrics.auc(z, pctcvr)
}
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
eval_metric_ops=eval_metric_ops)
#------bulid optimizer------
if FLAGS.optimizer == 'Adam':
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8)
elif FLAGS.optimizer == 'Adagrad':
optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate, initial_accumulator_value=1e-8)
elif FLAGS.optimizer == 'Momentum':
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.95)
elif FLAGS.optimizer == 'ftrl':
optimizer = tf.train.FtrlOptimizer(learning_rate)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
# Provide an estimator spec for `ModeKeys.TRAIN` modes
if mode == tf.estimator.ModeKeys.TRAIN:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
train_op=train_op)
def batch_norm_layer(x, train_phase, scope_bn):
bn_train = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=True, reuse=None, scope=scope_bn)
bn_infer = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=False, reuse=True, scope=scope_bn)
z = tf.cond(tf.cast(train_phase, tf.bool), lambda: bn_train, lambda: bn_infer)
return z
def set_dist_env():
if FLAGS.dist_mode == 1: # 本地分布式测试模式1 chief, 1 ps, 1 evaluator
ps_hosts = FLAGS.ps_hosts.split(',')
chief_hosts = FLAGS.chief_hosts.split(',')
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# 无worker参数
tf_config = {
'cluster': {'chief': chief_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
elif FLAGS.dist_mode == 2: # 集群分布式模式
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
chief_hosts = worker_hosts[0:1] # get first worker as chief
worker_hosts = worker_hosts[2:] # the rest as worker
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('worker_host', worker_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# use #worker=0 as chief
if job_name == "worker" and task_index == 0:
job_name = "chief"
# use #worker=1 as evaluator
if job_name == "worker" and task_index == 1:
job_name = 'evaluator'
task_index = 0
# the others as worker
if job_name == "worker" and task_index > 1:
task_index -= 2
tf_config = {
'cluster': {'chief': chief_hosts, 'worker': worker_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
def main(_):
#------check Arguments------
if FLAGS.dt_dir == "":
FLAGS.dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
FLAGS.model_dir = FLAGS.model_dir + FLAGS.dt_dir
#FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir
print('task_type ', FLAGS.task_type)
print('model_dir ', FLAGS.model_dir)
print('data_dir ', FLAGS.data_dir)
print('dt_dir ', FLAGS.dt_dir)
print('num_epochs ', FLAGS.num_epochs)
print('feature_size ', FLAGS.feature_size)
print('field_size ', FLAGS.field_size)
print('embedding_size ', FLAGS.embedding_size)
print('batch_size ', FLAGS.batch_size)
print('deep_layers ', FLAGS.deep_layers)
print('dropout ', FLAGS.dropout)
print('loss_type ', FLAGS.loss_type)
print('optimizer ', FLAGS.optimizer)
print('learning_rate ', FLAGS.learning_rate)
print('l2_reg ', FLAGS.l2_reg)
print('ctr_task_wgt ', FLAGS.ctr_task_wgt)
#------init Envs------
tr_files = glob.glob("%s/tr/*tfrecord" % FLAGS.data_dir)
random.shuffle(tr_files)
print("tr_files:", tr_files)
va_files = glob.glob("%s/va/*tfrecord" % FLAGS.data_dir)
print("va_files:", va_files)
te_files = glob.glob("%s/*tfrecord" % FLAGS.data_dir)
print("te_files:", te_files)
if FLAGS.clear_existing_model:
try:
shutil.rmtree(FLAGS.model_dir)
except Exception as e:
print(e, "at clear_existing_model")
else:
print("existing model cleaned at %s" % FLAGS.model_dir)
set_dist_env()
#------bulid Tasks------
model_params = {
"field_size": FLAGS.field_size,
"feature_size": FLAGS.feature_size,
"embedding_size": FLAGS.embedding_size,
"learning_rate": FLAGS.learning_rate,
"l2_reg": FLAGS.l2_reg,
"deep_layers": FLAGS.deep_layers,
"dropout": FLAGS.dropout,
"ctr_task_wgt":FLAGS.ctr_task_wgt
}
config = tf.estimator.RunConfig().replace(session_config = tf.ConfigProto(device_count={'GPU':0, 'CPU':FLAGS.num_threads}),
log_step_count_steps=FLAGS.log_steps, save_summary_steps=FLAGS.log_steps)
Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=FLAGS.model_dir, params=model_params, config=config)
if FLAGS.task_type == 'train':
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(tr_files, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size))
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size), steps=None, start_delay_secs=1000, throttle_secs=1200)
result = tf.estimator.train_and_evaluate(Estimator, train_spec, eval_spec)
for key,value in sorted(result[0].items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'eval':
result = Estimator.evaluate(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size))
for key,value in sorted(result.items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'infer':
preds = Estimator.predict(input_fn=lambda: input_fn(te_files, num_epochs=1, batch_size=FLAGS.batch_size), predict_keys=["pctcvr","pctr","pcvr"])
with open(FLAGS.data_dir+"/pred.txt", "w") as fo:
print("-"*100)
with open(FLAGS.data_dir + "/pred.txt", "w") as fo:
for prob in preds:
fo.write("%f\t%f\n" % (prob['pctr'], prob['pcvr']))
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
#feature_spec = tf.feature_column.make_parse_example_spec(feature_columns)
#feature_spec = {
# 'feat_ids': tf.FixedLenFeature(dtype=tf.int64, shape=[None, FLAGS.field_size]),
# 'feat_vals': tf.FixedLenFeature(dtype=tf.float32, shape=[None, FLAGS.field_size])
#}
#serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
#feature_spec = {
# 'feat_ids': tf.placeholder(dtype=tf.int64, shape=[None, FLAGS.field_size], name='feat_ids'),
# 'feat_vals': tf.placeholder(dtype=tf.float32, shape=[None, FLAGS.field_size], name='feat_vals')
#}
#serving_input_receiver_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feature_spec)
#Estimator.export_savedmodel(FLAGS.servable_model_dir, serving_input_receiver_fn)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
# -*- coding: utf-8 -*-
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
import datetime
my_sender='gaoyazhe@igengmei.com'
my_pass = 'VCrKTui99a7ALhiK'
my_user='gaoyazhe@igengmei.com'
def mail():
ret=True
try:
with open('/srv/apps/ffm-baseline/eda/esmm/Model_pipline/train.log') as f:
stat_data = f.read()
msg=MIMEText(stat_data,'plain','utf-8')
msg['From']=formataddr(["高雅喆",my_sender])
msg['To']=formataddr(["高雅喆",my_user])
msg['Subject']= str(datetime.date.today())+"-esmm多目标模型训练指标统计"
server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
server.login(my_sender, my_pass)
server.sendmail(my_sender,[my_user,],msg.as_string())
server.quit()
except Exception:
ret=False
return ret
ret=mail()
if ret:
print("邮件发送成功")
else:
print("邮件发送失败")
\ No newline at end of file
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import MySQLdb
import time
def con_sql(sql):
"""
:type sql : str
:rtype : tuple
"""
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def set_join(lst):
return ','.join(set(lst))
def main():
sql = "select device_id,city_id,cid from esmm_data2ffm_infer_native"
result = con_sql(sql)
dct = {"uid":[],"city":[],"cid_id":[]}
for i in result:
dct["uid"].append(i[0])
dct["city"].append(i[1])
dct["cid_id"].append(i[2])
df1 = pd.read_csv("/srv/apps/ffm-baseline/eda/esmm/data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr"])
df2 = pd.DataFrame(dct)
df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
ctime = int(time.time())
df3["time"] = ctime
df3.columns = ["device_id","city_id","native_queue","time"]
print("native_device_count",df3.shape)
sql_nearby = "select device_id,city_id,cid from esmm_data2ffm_infer_nearby"
result = con_sql(sql_nearby)
dct = {"uid":[],"city":[],"cid_id":[]}
for i in result:
dct["uid"].append(i[0])
dct["city"].append(i[1])
dct["cid_id"].append(i[2])
df1 = pd.read_csv("/srv/apps/ffm-baseline/eda/esmm/data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr"])
df2 = pd.DataFrame(dct)
df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
print("union_device_count",df_all.shape)
host='10.66.157.22'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
db='jerry_test'
charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='replace',index=False)
except Exception as e:
print(e)
if __name__ == '__main__':
main()
\ No newline at end of file
#! /bin/bash
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $currentTimeStamp
echo "rm leave tfrecord"
rm /srv/apps/ffm-baseline/eda/esmm/data/tr/*
rm /srv/apps/ffm-baseline/eda/esmm/data/va/*
rm /srv/apps/ffm-baseline/eda/esmm/data/native/*
rm /srv/apps/ffm-baseline/eda/esmm/data/nearby/*
echo "mysql to csv"
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_train" > /srv/apps/ffm-baseline/eda/esmm/data/tr.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_cv" > /srv/apps/ffm-baseline/eda/esmm/data/va.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_native" > /srv/apps/ffm-baseline/eda/esmm/data/native.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_nearby" > /srv/apps/ffm-baseline/eda/esmm/data/nearby.csv
echo "split data"
split -l $((`wc -l < /srv/apps/ffm-baseline/eda/esmm/data/tr.csv`/15)) /srv/apps/ffm-baseline/eda/esmm/data/tr.csv -d -a 4 /srv/apps/ffm-baseline/eda/esmm/data/tr/tr_ --additional-suffix=.csv
split -l $((`wc -l < /srv/apps/ffm-baseline/eda/esmm/data/va.csv`/5)) /srv/apps/ffm-baseline/eda/esmm/data/va.csv -d -a 4 /srv/apps/ffm-baseline/eda/esmm/data/va/va_ --additional-suffix=.csv
split -l $((`wc -l < /srv/apps/ffm-baseline/eda/esmm/data/native.csv`/5)) /srv/apps/ffm-baseline/eda/esmm/data/native.csv -d -a 4 /srv/apps/ffm-baseline/eda/esmm/data/native/native_ --additional-suffix=.csv
split -l $((`wc -l < /srv/apps/ffm-baseline/eda/esmm/data/nearby.csv`/5)) /srv/apps/ffm-baseline/eda/esmm/data/nearby.csv -d -a 4 /srv/apps/ffm-baseline/eda/esmm/data/nearby/nearby_ --additional-suffix=.csv
echo "csv to tfrecord"
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Feature_pipline/get_tfrecord.py --input_dir=/srv/apps/ffm-baseline/eda/esmm/data/tr/ --output_dir=/srv/apps/ffm-baseline/eda/esmm/data/tr/
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Feature_pipline/get_tfrecord.py --input_dir=/srv/apps/ffm-baseline/eda/esmm/data/va/ --output_dir=/srv/apps/ffm-baseline/eda/esmm/data/va/
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Feature_pipline/get_tfrecord.py --input_dir=/srv/apps/ffm-baseline/eda/esmm/data/native/ --output_dir=/srv/apps/ffm-baseline/eda/esmm/data/native/
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Feature_pipline/get_tfrecord.py --input_dir=/srv/apps/ffm-baseline/eda/esmm/data/nearby/ --output_dir=/srv/apps/ffm-baseline/eda/esmm/data/nearby/
cat /srv/apps/ffm-baseline/eda/esmm/data/tr/*.tfrecord > /srv/apps/ffm-baseline/eda/esmm/data/tr/tr.tfrecord
cat /srv/apps/ffm-baseline/eda/esmm/data/va/*.tfrecord > /srv/apps/ffm-baseline/eda/esmm/data/va/va.tfrecord
cat /srv/apps/ffm-baseline/eda/esmm/data/native/*.tfrecord > /srv/apps/ffm-baseline/eda/esmm/data/native/native.tfrecord
cat /srv/apps/ffm-baseline/eda/esmm/data/nearby/*.tfrecord > /srv/apps/ffm-baseline/eda/esmm/data/nearby/nearby.tfrecord
rm /srv/apps/ffm-baseline/eda/esmm/data/tr/tr_*
rm /srv/apps/ffm-baseline/eda/esmm/data/va/va_*
rm /srv/apps/ffm-baseline/eda/esmm/data/native/native_*
rm /srv/apps/ffm-baseline/eda/esmm/data/nearby/nearby_*
echo "train..."
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=/srv/apps/ffm-baseline/eda/esmm/Model_pipline/model_ckpt/DeepCvrMTL/ --data_dir="/srv/apps/ffm-baseline/eda/esmm/data" --task_type="train" > /srv/apps/ffm-baseline/eda/esmm/Model_pipline/train.log
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/send_mail.py
echo "infer native..."
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=/srv/apps/ffm-baseline/eda/esmm/Model_pipline/model_ckpt/DeepCvrMTL/ --data_dir="/srv/apps/ffm-baseline/eda/esmm/data/native" --task_type="infer"
echo "infer nearby..."
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=/srv/apps/ffm-baseline/eda/esmm/Model_pipline/model_ckpt/DeepCvrMTL/ --data_dir="/srv/apps/ffm-baseline/eda/esmm/data/nearby" --task_type="infer"
echo "sort and 2sql"
/home/gaoyazhe/miniconda3/bin/python /srv/apps/ffm-baseline/eda/esmm/Model_pipline/sort_and_2sql.py
current=$(date "+%Y-%m-%d %H:%M:%S")
timeStamp=$(date -d "$current" +%s)
currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $currentTimeStamp
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment