Commit 84ddcbd3 authored by 张彦钊's avatar 张彦钊

修改特征工程文件

parent befbf04c
......@@ -104,15 +104,15 @@ def feature_engineer():
unique_values = []
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct stat_date from esmm_train_data_dwell"
sql = "select distinct stat_date from esmm_train_data_share"
unique_values.extend(get_unique(db,sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ucity_id from esmm_train_data_dwell"
sql = "select distinct ucity_id from esmm_train_data_share"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ccity_name from esmm_train_data_dwell"
sql = "select distinct ccity_name from esmm_train_data_share"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
......@@ -162,15 +162,16 @@ def feature_engineer():
# unique_values.append("video")
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data_dwell"
sql = "select max(stat_date) from esmm_train_data_share"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=180)).strftime("%Y-%m-%d")
print(start)
print("这是分享数据")
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \
sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_share e " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"where e.stat_date >= '{}'".format(start)
......@@ -190,13 +191,13 @@ def feature_engineer():
29 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp))
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
sql = "select e.y,e.z,e.s,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \
"e.device_id,e.cid_id " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"from jerry_test.esmm_train_data_share e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
......@@ -226,7 +227,7 @@ def feature_engineer():
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\
"maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id","s")\
.rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
......@@ -237,40 +238,37 @@ def feature_engineer():
value_map.get(x[20], 9), value_map.get(x[21], 10),
value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13),
value_map.get(x[25], 14), value_map.get(x[26], 15)],
app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30]
app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30],float(x[31])
))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
print("最近一天的数据没有加到训练集里")
train = rdd.filter(lambda x: x[0] != validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18],x[19]))
f = time.time()
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \
"search_tag2_list","search_tag3_list","city","cid_id","uid","s") \
.repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
h = time.time()
print("train tfrecord done")
print((h - f) / 60)
print("训练集样本总量:")
print(rdd.count())
get_pre_number()
test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18],x[19]))
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \
"search_tag2_list","search_tag3_list","city","cid_id","uid","s") \
.repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
print("va tfrecord done")
......@@ -389,3 +387,9 @@ if __name__ == '__main__':
# get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
spark.stop()
from dataProcess import *
from diaryTraining import *
#coding=utf-8
import pymysql
import os
import json
from datetime import date, timedelta
import tensorflow as tf
import time
from utils import *
import pandas as pd
import datetime
#################### CMD Arguments ####################
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_integer("dist_mode", 0, "distribuion mode {0-loacal, 1-single_dist, 2-multi_dist}")
tf.app.flags.DEFINE_string("ps_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", '', "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("num_threads", 16, "Number of threads")
tf.app.flags.DEFINE_integer("feature_size", 0, "Number of features")
tf.app.flags.DEFINE_integer("field_size", 0, "Number of common fields")
tf.app.flags.DEFINE_integer("embedding_size", 32, "Embedding size")
tf.app.flags.DEFINE_integer("num_epochs", 10, "Number of epochs")
tf.app.flags.DEFINE_integer("batch_size", 64, "Number of batch size")
tf.app.flags.DEFINE_integer("log_steps", 1000, "save summary every steps")
tf.app.flags.DEFINE_float("learning_rate", 0.0005, "learning rate")
tf.app.flags.DEFINE_float("l2_reg", 0.0001, "L2 regularization")
tf.app.flags.DEFINE_string("loss_type", 'log_loss', "loss type {square_loss, log_loss}")
tf.app.flags.DEFINE_float("ctr_task_wgt", 0.5, "loss weight of ctr task")
tf.app.flags.DEFINE_float("cvr_task_wgt", 0.3, "loss weight of cvr task")
tf.app.flags.DEFINE_string("optimizer", 'Adam', "optimizer type {Adam, Adagrad, GD, Momentum}")
tf.app.flags.DEFINE_string("deep_layers", '256,128,64', "deep layers")
tf.app.flags.DEFINE_string("dropout", '0.5,0.5,0.5', "dropout rate")
tf.app.flags.DEFINE_boolean("batch_norm", False, "perform batch normaization (True or False)")
tf.app.flags.DEFINE_float("batch_norm_decay", 0.9, "decay for the moving average(recommend trying decay=0.9)")
tf.app.flags.DEFINE_string("hdfs_dir", '', "hdfs dir")
tf.app.flags.DEFINE_string("local_dir", '', "local dir")
tf.app.flags.DEFINE_string("dt_dir", '', "data dt partition")
tf.app.flags.DEFINE_string("model_dir", '', "model check point dir")
tf.app.flags.DEFINE_string("servable_model_dir", '', "export servable model for TensorFlow Serving")
tf.app.flags.DEFINE_string("task_type", 'train', "task type {train, infer, eval, export}")
tf.app.flags.DEFINE_boolean("clear_existing_model", False, "clear existing model or not")
# 把数据获取、特征转换、模型训练的模型串联在一起
if __name__ == "__main__":
# while True:
# now = datetime.now()
# if (now.hour == 23) and (now.minute == 30):
start_train = time.time()
data_start_date, data_end_date, validation_date, test_date = get_date()
data, test_number, validation_number = feature_en(data_start_date, data_end_date,
validation_date,test_date)
ffm_transform(data, test_number, validation_number)
train()
end_train = time.time()
print("训练模型耗时{}分".format((end_train-start_train)/60))
move_file()
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
def _parse_fn(record):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"s": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64),
"level3_list": tf.VarLenFeature(tf.int64),
"tag1_list": tf.VarLenFeature(tf.int64),
"tag2_list": tf.VarLenFeature(tf.int64),
"tag3_list": tf.VarLenFeature(tf.int64),
"tag4_list": tf.VarLenFeature(tf.int64),
"tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64),
"search_tag2_list": tf.VarLenFeature(tf.int64),
"search_tag3_list": tf.VarLenFeature(tf.int64),
"uid": tf.VarLenFeature(tf.string),
"city": tf.VarLenFeature(tf.string),
"cid_id": tf.VarLenFeature(tf.string)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
s = parsed.pop('s')
return parsed, {"y": y, "z": z, "s":s}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
# dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=8).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
# if perform_shuffle:
# dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
# dataset = dataset.repeat(num_epochs)
# dataset = dataset.batch(batch_size) # Batch size to use
files = tf.data.Dataset.list_files(filenames)
dataset = files.apply(
tf.data.experimental.parallel_interleave(
lambda file: tf.data.TFRecordDataset(file),
cycle_length=8
)
)
dataset = dataset.apply(tf.data.experimental.map_and_batch(map_func=_parse_fn, batch_size=batch_size, num_parallel_calls=8))
dataset = dataset.prefetch(10000)
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
field_size = params["field_size"]
feature_size = params["feature_size"]
embedding_size = params["embedding_size"]
l2_reg = params["l2_reg"]
learning_rate = params["learning_rate"]
#optimizer = params["optimizer"]
layers = list(map(int, params["deep_layers"].split(',')))
dropout = list(map(float, params["dropout"].split(',')))
ctr_task_wgt = params["ctr_task_wgt"]
cvr_task_wgt = params["cvr_task_wgt"]
common_dims = field_size*embedding_size
#------bulid weights------
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
feat_ids = features['ids']
app_list = features['app_list']
level2_list = features['level2_list']
level3_list = features['level3_list']
tag1_list = features['tag1_list']
tag2_list = features['tag2_list']
tag3_list = features['tag3_list']
tag4_list = features['tag4_list']
tag5_list = features['tag5_list']
tag6_list = features['tag6_list']
tag7_list = features['tag7_list']
search_tag2_list = features['search_tag2_list']
search_tag3_list = features['search_tag3_list']
uid = features['uid']
city = features['city']
cid_id = features['cid_id']
if FLAGS.task_type != "infer":
y = labels['y']
z = labels['z']
s = labels['s']
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
level2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level2_list, sp_weights=None, combiner="sum")
level3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level3_list, sp_weights=None, combiner="sum")
tag1 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag1_list, sp_weights=None, combiner="sum")
tag2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag2_list, sp_weights=None, combiner="sum")
tag3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag3_list, sp_weights=None, combiner="sum")
tag4 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag4_list, sp_weights=None, combiner="sum")
tag5 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag5_list, sp_weights=None, combiner="sum")
tag6 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag6_list, sp_weights=None, combiner="sum")
tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum")
search_tag2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=search_tag2_list, sp_weights=None, combiner="sum")
search_tag3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=search_tag3_list, sp_weights=None, combiner="sum")
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1,
tag2, tag3, tag4, tag5, tag6, tag7,search_tag2,search_tag3], axis=1)
uid = tf.sparse.to_dense(uid,default_value="")
city = tf.sparse.to_dense(city,default_value="")
cid_id = tf.sparse.to_dense(cid_id,default_value="")
with tf.name_scope("CSR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_csr = x_concat
for i in range(len(layers)):
x_csr = tf.contrib.layers.fully_connected(inputs=x_csr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='csr_mlp%d' % i)
if FLAGS.batch_norm:
x_csr = batch_norm_layer(x_csr, train_phase=train_phase, scope_bn='csr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_csr = tf.nn.dropout(x_csr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_csr = tf.contrib.layers.fully_connected(inputs=x_csr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='csr_out')
y_csr = tf.reshape(y_csr,shape=[-1])
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_cvr = x_concat
for i in range(len(layers)):
x_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_mlp%d' % i)
if FLAGS.batch_norm:
x_cvr = batch_norm_layer(x_cvr, train_phase=train_phase, scope_bn='cvr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_cvr = tf.nn.dropout(x_cvr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_out')
y_cvr = tf.reshape(y_cvr,shape=[-1])
with tf.name_scope("CTR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_ctr = x_concat
for i in range(len(layers)):
x_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_mlp%d' % i)
if FLAGS.batch_norm:
x_ctr = batch_norm_layer(x_ctr, train_phase=train_phase, scope_bn='ctr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_ctr = tf.nn.dropout(x_ctr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_out')
y_ctr = tf.reshape(y_ctr,shape=[-1])
with tf.variable_scope("MTL-Layer"):
pctr = tf.sigmoid(y_ctr)
pcvr = tf.sigmoid(y_cvr)
pcsr = tf.sigmoid(y_csr)
pctcvr = pctr*pcvr
pctcsr = pctr*pcsr
predictions = {"pctcvr": pctcvr,"pctcsr": pctcsr, "uid": uid, "city": city, "cid_id": cid_id}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
export_outputs=export_outputs)
if FLAGS.task_type != "infer":
#------bulid loss------
ctr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctr, labels=y))
#cvr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctcvr, labels=z))
cvr_loss = tf.reduce_mean(tf.losses.log_loss(predictions=pctcvr, labels=z))
csr_loss = tf.reduce_mean(tf.losses.log_loss(predictions=pctcsr, labels=s))
loss = ctr_task_wgt * ctr_loss + cvr_task_wgt * cvr_loss + (1-ctr_task_wgt-cvr_task_wgt) * csr_loss+ l2_reg * tf.nn.l2_loss(Feat_Emb)
tf.summary.scalar('ctr_loss', ctr_loss)
tf.summary.scalar('cvr_loss', cvr_loss)
tf.summary.scalar('csr_loss', csr_loss)
# Provide an estimator spec for `ModeKeys.EVAL`
eval_metric_ops = {
"CTR_AUC": tf.metrics.auc(y, pctr),
#"CTR_F1": tf.contrib.metrics.f1_score(y,pctr),
#"CTR_Precision": tf.metrics.precision(y,pctr),
#"CTR_Recall": tf.metrics.recall(y,pctr),
"CVR_AUC": tf.metrics.auc(z, pcvr),
"CSR_AUC": tf.metrics.auc(s, pcsr),
"CTCVR_AUC": tf.metrics.auc(z, pctcvr),
"CTCSR_AUC": tf.metrics.auc(z, pctcsr)
}
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
eval_metric_ops=eval_metric_ops)
#------bulid optimizer------
if FLAGS.optimizer == 'Adam':
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8)
elif FLAGS.optimizer == 'Adagrad':
optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate, initial_accumulator_value=1e-8)
elif FLAGS.optimizer == 'Momentum':
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.95)
elif FLAGS.optimizer == 'ftrl':
optimizer = tf.train.FtrlOptimizer(learning_rate)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
# Provide an estimator spec for `ModeKeys.TRAIN` modes
if mode == tf.estimator.ModeKeys.TRAIN:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
train_op=train_op)
def batch_norm_layer(x, train_phase, scope_bn):
bn_train = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=True, reuse=None, scope=scope_bn)
bn_infer = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=False, reuse=True, scope=scope_bn)
z = tf.cond(tf.cast(train_phase, tf.bool), lambda: bn_train, lambda: bn_infer)
return z
def set_dist_env():
if FLAGS.dist_mode == 1: # 本地分布式测试模式1 chief, 1 ps, 1 evaluator
ps_hosts = FLAGS.ps_hosts.split(',')
chief_hosts = FLAGS.chief_hosts.split(',')
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# 无worker参数
tf_config = {
'cluster': {'chief': chief_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
elif FLAGS.dist_mode == 2: # 集群分布式模式
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
chief_hosts = worker_hosts[0:1] # get first worker as chief
worker_hosts = worker_hosts[2:] # the rest as worker
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('worker_host', worker_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# use #worker=0 as chief
if job_name == "worker" and task_index == 0:
job_name = "chief"
# use #worker=1 as evaluator
if job_name == "worker" and task_index == 1:
job_name = 'evaluator'
task_index = 0
# the others as worker
if job_name == "worker" and task_index > 1:
task_index -= 2
tf_config = {
'cluster': {'chief': chief_hosts, 'worker': worker_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
def main(file_path):
#------check Arguments------
if FLAGS.dt_dir == "":
FLAGS.dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
FLAGS.model_dir = FLAGS.model_dir + FLAGS.dt_dir
#FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir
va_files = ["hdfs://172.16.32.4:8020/strategy/esmm/va/part-r-00000"]
# if FLAGS.clear_existing_model:
# try:
# shutil.rmtree(FLAGS.model_dir)
# except Exception as e:
# print(e, "at clear_existing_model")
# else:
# print("existing model cleaned at %s" % FLAGS.model_dir)
# set_dist_env()
#------bulid Tasks------
model_params = {
"field_size": FLAGS.field_size,
"feature_size": FLAGS.feature_size,
"embedding_size": FLAGS.embedding_size,
"learning_rate": FLAGS.learning_rate,
"l2_reg": FLAGS.l2_reg,
"deep_layers": FLAGS.deep_layers,
"dropout": FLAGS.dropout,
"ctr_task_wgt":FLAGS.ctr_task_wgt,
"cvr_task_wgt": FLAGS.cvr_task_wgt
}
config = tf.estimator.RunConfig().replace(session_config = tf.ConfigProto(device_count={'GPU':0, 'CPU':FLAGS.num_threads}),
log_step_count_steps=FLAGS.log_steps, save_summary_steps=FLAGS.log_steps)
Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=FLAGS.model_dir, params=model_params, config=config)
if FLAGS.task_type == 'train':
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(file_path, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size))
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size), steps=None, start_delay_secs=1000, throttle_secs=1200)
result = tf.estimator.train_and_evaluate(Estimator, train_spec, eval_spec)
for key,value in sorted(result[0].items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'eval':
result = Estimator.evaluate(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size))
for key,value in sorted(result.items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'infer':
preds = Estimator.predict(input_fn=lambda: input_fn(file_path, num_epochs=1, batch_size=FLAGS.batch_size), predict_keys=["pctcvr","pctcsr","uid","city","cid_id"])
result = []
for prob in preds:
result.append([str(prob["uid"][0]), str(prob["city"][0]), str(prob["cid_id"][0]), str(prob['pctcvr']),str(prob['pctcsr'])])
return result
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
def trans(x):
return str(x)[2:-1] if str(x)[0] == 'b' else x
def set_join(lst):
l = lst.unique().tolist()
r = [str(i) for i in l]
r =r[:500]
return ','.join(r)
def df_sort(result,queue_name):
df = pd.DataFrame(result, columns=["uid", "city", "cid_id", "pctcvr"])
# print(df.head(10))
df['uid1'] = df['uid'].apply(trans)
df['city1'] = df['city'].apply(trans)
df['cid_id1'] = df['cid_id'].apply(trans)
df2 = df.groupby(by=["uid1", "city1"]).apply(lambda x: x.sort_values(by="pctcvr", ascending=False)) \
.reset_index(drop=True).groupby(by=["uid1", "city1"]).agg({'cid_id1': set_join}).reset_index(drop=False)
df2.columns = ["device_id", "city_id", queue_name]
df2["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
return df2
def update_or_insert(df2,queue_name):
device_count = df2.shape[0]
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
cur = con.cursor()
try:
for i in range(0, device_count):
query = """INSERT INTO esmm_device_diary_queue (device_id, city_id, time,%s) VALUES('%s', '%s', '%s', '%s') \
ON DUPLICATE KEY UPDATE device_id='%s', city_id='%s', time='%s', %s='%s'""" % (queue_name, df2.device_id[i],df2.city_id[i], df2.time[i], df2[queue_name][i], df2.device_id[i], df2.city_id[i], df2.time[i], queue_name, df2[queue_name][i])
cur.execute(query)
con.commit()
con.close()
print("insert or update sucess")
except Exception as e:
print(e)
if __name__ == "__main__":
b = time.time()
path = "hdfs://172.16.32.4:8020/strategy/esmm/"
tf.logging.set_verbosity(tf.logging.INFO)
if FLAGS.task_type == 'train':
print("train task")
tr_files = ["hdfs://172.16.32.4:8020/strategy/esmm/tr/part-r-00000"]
main(tr_files)
elif FLAGS.task_type == 'infer':
te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
queue_name = te_files[0].split('/')[-2] + "_queue"
print(queue_name + " task")
result = main(te_files)
print(result[10])
#df = df_sort(result,queue_name)
#update_or_insert(df,queue_name)
print("耗时(分钟):")
print((time.time()-b)/60)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment