Commit 7dd548c6 authored by Your Name's avatar Your Name

change predict and sort process

parent 41fc8e97
...@@ -24,14 +24,6 @@ ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 ...@@ -24,14 +24,6 @@ ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001
echo "infer native..." echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer
echo "infer nearby..." echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
echo "delete files"
rm /home/gmuser/esmm/*.csv
rm /home/gmuser/esmm/native/*
rm /home/gmuser/esmm/nearby/*
#coding=utf-8 #coding=utf-8
#from __future__ import absolute_import import pymysql
#from __future__ import division
#from __future__ import print_function
#import argparse
import shutil
import os import os
import json import json
from datetime import date, timedelta from datetime import date, timedelta
import tensorflow as tf import tensorflow as tf
import subprocess
import time import time
import glob import pandas as pd
import random import datetime
#################### CMD Arguments #################### #################### CMD Arguments ####################
FLAGS = tf.app.flags.FLAGS FLAGS = tf.app.flags.FLAGS
...@@ -63,7 +57,12 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False): ...@@ -63,7 +57,12 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
"tag4_list": tf.VarLenFeature(tf.int64), "tag4_list": tf.VarLenFeature(tf.int64),
"tag5_list": tf.VarLenFeature(tf.int64), "tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64), "tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64) "tag7_list": tf.VarLenFeature(tf.int64),
"search_tag2_list": tf.VarLenFeature(tf.int64),
"search_tag3_list": tf.VarLenFeature(tf.int64),
"uid": tf.VarLenFeature(tf.string),
"city": tf.VarLenFeature(tf.string),
"cid_id": tf.VarLenFeature(tf.string)
} }
parsed = tf.parse_single_example(record, features) parsed = tf.parse_single_example(record, features)
y = parsed.pop('y') y = parsed.pop('y')
...@@ -102,7 +101,6 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False): ...@@ -102,7 +101,6 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
#print(batch_features,batch_labels) #print(batch_features,batch_labels)
return batch_features, batch_labels return batch_features, batch_labels
def model_fn(features, labels, mode, params): def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator.""" """Bulid Model function f(x) for Estimator."""
#------hyperparameters---- #------hyperparameters----
...@@ -131,6 +129,12 @@ def model_fn(features, labels, mode, params): ...@@ -131,6 +129,12 @@ def model_fn(features, labels, mode, params):
tag5_list = features['tag5_list'] tag5_list = features['tag5_list']
tag6_list = features['tag6_list'] tag6_list = features['tag6_list']
tag7_list = features['tag7_list'] tag7_list = features['tag7_list']
search_tag2_list = features['search_tag2_list']
search_tag3_list = features['search_tag3_list']
uid = features['uid']
city = features['city']
cid_id = features['cid_id']
if FLAGS.task_type != "infer": if FLAGS.task_type != "infer":
y = labels['y'] y = labels['y']
...@@ -149,10 +153,17 @@ def model_fn(features, labels, mode, params): ...@@ -149,10 +153,17 @@ def model_fn(features, labels, mode, params):
tag5 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag5_list, sp_weights=None, combiner="sum") tag5 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag5_list, sp_weights=None, combiner="sum")
tag6 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag6_list, sp_weights=None, combiner="sum") tag6 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag6_list, sp_weights=None, combiner="sum")
tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum") tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum")
search_tag2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=search_tag2_list, sp_weights=None, combiner="sum")
search_tag3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=search_tag3_list, sp_weights=None, combiner="sum")
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K) # x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1, x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1,
tag2, tag3, tag4, tag5, tag6, tag7], axis=1) tag2, tag3, tag4, tag5, tag6, tag7,search_tag2,search_tag3], axis=1)
uid = tf.sparse.to_dense(uid,default_value="")
city = tf.sparse.to_dense(city,default_value="")
cid_id = tf.sparse.to_dense(cid_id,default_value="")
with tf.name_scope("CVR_Task"): with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN: if mode == tf.estimator.ModeKeys.TRAIN:
...@@ -198,7 +209,7 @@ def model_fn(features, labels, mode, params): ...@@ -198,7 +209,7 @@ def model_fn(features, labels, mode, params):
pcvr = tf.sigmoid(y_cvr) pcvr = tf.sigmoid(y_cvr)
pctcvr = pctr*pcvr pctcvr = pctr*pcvr
predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr} predictions = {"pctcvr": pctcvr, "uid": uid, "city": city, "cid_id": cid_id}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)} export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT` # Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT: if mode == tf.estimator.ModeKeys.PREDICT:
...@@ -306,26 +317,26 @@ def set_dist_env(): ...@@ -306,26 +317,26 @@ def set_dist_env():
print(json.dumps(tf_config)) print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config) os.environ['TF_CONFIG'] = json.dumps(tf_config)
def main(_): def main(file_path):
#------check Arguments------ #------check Arguments------
if FLAGS.dt_dir == "": if FLAGS.dt_dir == "":
FLAGS.dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d') FLAGS.dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
FLAGS.model_dir = FLAGS.model_dir + FLAGS.dt_dir FLAGS.model_dir = FLAGS.model_dir + FLAGS.dt_dir
#FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir #FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir
tr_files = ["hdfs://172.16.32.4:8020/strategy/esmm/tr/part-r-00000"]
va_files = ["hdfs://172.16.32.4:8020/strategy/esmm/va/part-r-00000"] va_files = ["hdfs://172.16.32.4:8020/strategy/esmm/va/part-r-00000"]
te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
if FLAGS.clear_existing_model:
try:
shutil.rmtree(FLAGS.model_dir)
except Exception as e:
print(e, "at clear_existing_model")
else:
print("existing model cleaned at %s" % FLAGS.model_dir)
set_dist_env() # if FLAGS.clear_existing_model:
# try:
# shutil.rmtree(FLAGS.model_dir)
# except Exception as e:
# print(e, "at clear_existing_model")
# else:
# print("existing model cleaned at %s" % FLAGS.model_dir)
# set_dist_env()
#------bulid Tasks------ #------bulid Tasks------
model_params = { model_params = {
...@@ -343,7 +354,7 @@ def main(_): ...@@ -343,7 +354,7 @@ def main(_):
Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=FLAGS.model_dir, params=model_params, config=config) Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=FLAGS.model_dir, params=model_params, config=config)
if FLAGS.task_type == 'train': if FLAGS.task_type == 'train':
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(tr_files, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size)) train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(file_path, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size))
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size), steps=None, start_delay_secs=1000, throttle_secs=1200) eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size), steps=None, start_delay_secs=1000, throttle_secs=1200)
result = tf.estimator.train_and_evaluate(Estimator, train_spec, eval_spec) result = tf.estimator.train_and_evaluate(Estimator, train_spec, eval_spec)
for key,value in sorted(result[0].items()): for key,value in sorted(result[0].items()):
...@@ -353,18 +364,67 @@ def main(_): ...@@ -353,18 +364,67 @@ def main(_):
for key,value in sorted(result.items()): for key,value in sorted(result.items()):
print('%s: %s' % (key,value)) print('%s: %s' % (key,value))
elif FLAGS.task_type == 'infer': elif FLAGS.task_type == 'infer':
preds = Estimator.predict(input_fn=lambda: input_fn(te_files, num_epochs=1, batch_size=FLAGS.batch_size), predict_keys=["pctcvr","pctr","pcvr"]) preds = Estimator.predict(input_fn=lambda: input_fn(file_path, num_epochs=1, batch_size=FLAGS.batch_size), predict_keys=["pctcvr","uid","city","cid_id"])
with open(FLAGS.local_dir + "/pred.txt", "w") as fo: result = []
for prob in preds: for prob in preds:
fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr'])) result.append([str(prob["uid"][0]), str(prob["city"][0]), str(prob["cid_id"][0]), str(prob['pctcvr'])])
return result
elif FLAGS.task_type == 'export': elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!") print("Not Implemented, Do It Yourself!")
def trans(x):
return str(x)[2:-1] if str(x)[0] == 'b' else x
def set_join(lst):
l = lst.unique().tolist()
r = [str(i) for i in l]
r =r[:500]
return ','.join(r)
def df_sort(result,queue_name):
df = pd.DataFrame(result, columns=["uid", "city", "cid_id", "pctcvr"])
# print(df.head(10))
df['uid1'] = df['uid'].apply(trans)
df['city1'] = df['city'].apply(trans)
df['cid_id1'] = df['cid_id'].apply(trans)
df2 = df.groupby(by=["uid1", "city1"]).apply(lambda x: x.sort_values(by="pctcvr", ascending=False)) \
.reset_index(drop=True).groupby(by=["uid1", "city1"]).agg({'cid_id1': set_join}).reset_index(drop=False)
df2.columns = ["device_id", "city_id", queue_name]
df2["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
return df2
def update_or_insert(df2,queue_name):
device_count = df2.shape[0]
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
cur = con.cursor()
try:
for i in range(0, device_count):
query = """INSERT INTO esmm_device_diary_queue (device_id, city_id, time,%s) VALUES('%s', '%s', '%s', '%s') \
ON DUPLICATE KEY UPDATE device_id='%s', city_id='%s', time='%s', %s='%s'""" % (queue_name, df2.device_id[i],df2.city_id[i], df2.time[i], df2[queue_name][i], df2.device_id[i], df2.city_id[i], df2.time[i], queue_name, df2[queue_name][i])
cur.execute(query)
con.commit()
con.close()
print("insert or update sucess")
except Exception as e:
print(e)
if __name__ == "__main__": if __name__ == "__main__":
b = time.time() b = time.time()
path = "hdfs://172.16.32.4:8020/strategy/esmm/" path = "hdfs://172.16.32.4:8020/strategy/esmm/"
tf.logging.set_verbosity(tf.logging.INFO) tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run() if FLAGS.task_type == 'train':
print("train task")
tr_files = ["hdfs://172.16.32.4:8020/strategy/esmm/tr/part-r-00000"]
main(tr_files)
elif FLAGS.task_type == 'infer':
te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
queue_name = te_files[0].split('/')[-2] + "_queue"
print(queue_name + " task")
result = main(te_files)
df = df_sort(result,queue_name)
update_or_insert(df,queue_name)
print("耗时(分钟):") print("耗时(分钟):")
print((time.time()-b)/60) print((time.time()-b)/60)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment