Commit 240e771c authored by 宋柯's avatar 宋柯

模型上线

parent e95dcc57
source /srv/envs/serviceRec/bin/activate
python_os=/srv/envs/serviceRec/bin/python
path=/srv/apps/serviceRec
day_count=$1
spark_mode=$2
trainDays=$1
itemStatisticStartDays=$2
spark_mode=$3
content_type="service"
pythonFile=${path}/spark/featureEng.py
pythonFile=${path}/spark/featureEngSk.py
#log_file=~/${content_type}_feature_csv_export.log
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_v1_train
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_v1_test
if [ -n "${spark_mode}" ]; then
/opt/spark/bin/spark-submit --master local[4] --deploy-mode client --driver-memory 8g --executor-memory 2g --executor-cores 1 --num-executors 4 --conf spark.pyspark.python=${python_os} --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
/opt/spark/bin/spark-submit --master local[8] --deploy-mode client --driver-memory 16g --executor-memory 2g --executor-cores 1 --num-executors 4 --conf spark.pyspark.python=${python_os} --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $trainDays $itemStatisticStartDays
else
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 8g --executor-memory 2g --executor-cores 1 --num-executors 8 --conf spark.pyspark.python=${python_os} --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 8g --executor-memory 2g --executor-cores 1 --num-executors 8 --conf spark.pyspark.python=${python_os} --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $trainDays $itemStatisticStartDays
fi
#/opt/spark/bin/spark-submit --master local[4] --deploy-mode client --driver-memory 8g --executor-memory 2g --executor-cores 1 --num-executors 4 --conf spark.pyspark.python=/srv/envs/serviceRec/bin/python --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
......
cd /srv/apps/tensorServing_models/
cd /data/files/wideAndDeep/tensorServing_models/
fileNum=`ls service/|wc -l`
function pushModel() {
#文件打包
......
cd /srv/apps/tensorServing_models && rm -rf service_copy && mv service service_copy && mkdir service
cd /data/files/wideAndDeep/tensorServing_models && rm -rf service_copy && mv service service_copy && mkdir service
source /srv/envs/serviceRec/bin/activate
python /srv/apps/serviceRec/train/train_service.py > /srv/apps/serviceRec/logs/train_service_log.log
\ No newline at end of file
python /srv/apps/serviceRec/train/train_service_sk_tf2_distibute.py > /srv/apps/serviceRec/logs/train_service_log.log
\ No newline at end of file
import redis
import sys
import os
import json
def getRedisConn():
pool = redis.ConnectionPool(host="172.16.50.145",password="XfkMCCdWDIU%ls$h",port=6379,db=0)
conn = redis.Redis(connection_pool=pool)
# conn = redis.Redis(host="172.16.50.145", port=6379, password="XfkMCCdWDIU%ls$h",db=0)
# conn = redis.Redis(host="172.18.51.10", port=6379, db=0, decode_responses = True) #test
return conn
if len(sys.argv) == 2:
save_dir = sys.argv[1]
else:
save_dir = '/data/files/wideAndDeep/trainData/'
print('save_dir: ', save_dir)
if not os.path.exists(save_dir):
print('mkdir save_dir: ', save_dir)
os.makedirs(save_dir)
conn = getRedisConn()
vocab_keys = conn.lrange("strategy:all:vocab", 0, -1)
print("vocab_keys: ", vocab_keys[0])
vocab_keys = eval(vocab_keys[0])
for vocab_key in vocab_keys:
print('vocab_key: ', vocab_key)
splits = vocab_key.split(":")
field = splits[1]
filename = field + "_vocab.csv"
print('filename: ', filename)
with open(os.path.join(save_dir, filename), 'w') as f:
texts = conn.lrange(vocab_key, 0, -1)
texts = list(filter(lambda x: x != '', eval(texts[0])))
print('texts: ', len(texts))
f.write('\n'.join(texts))
os.system("hdfs dfs -getmerge /strategy/train_samples {save_dir}train_samples.csv".format(save_dir = save_dir))
os.system("hdfs dfs -getmerge /strategy/eval_samples {save_dir}eval_samples.csv".format(save_dir = save_dir))
......@@ -7,8 +7,12 @@ os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
start = time.time()
BASE_DIR = '/data/files/wideAndDeep/'
MODEL_BASE_DIR = '/data/files/wideAndDeep_tf2_dist/'
DATA_DIR = '/data/files/wideAndDeep/trainData/'
CHECKPOINTS_DIR = '/data/files/wideAndDeep/checkpoints/'
if os.path.exists(CHECKPOINTS_DIR):
os.rmdir(CHECKPOINTS_DIR)
MODEL_BASE_DIR = '/data/files/wideAndDeep/tensorServing_models/service'
def input_fn(csv_path, epoch, shuffle, batch_size):
dataset = tf.data.TextLineDataset(csv_path)
......@@ -110,30 +114,30 @@ ITEM_NUMERIC_case_count_fc = tf.feature_column.bucketized_column(tf.feature_colu
ITEM_NUMERIC_sales_count_fc = tf.feature_column.bucketized_column(tf.feature_column.numeric_column('ITEM_NUMERIC_sales_count'), boundaries)
ITEM_NUMERIC_sku_price_fc = tf.feature_column.bucketized_column(tf.feature_column.numeric_column('ITEM_NUMERIC_sku_price'), boundaries)
USER_CATEGORY_device_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_CATEGORY_device_id', BASE_DIR + 'USER_CATEGORY_device_id_vocab.csv')
USER_CATEGORY_os_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_CATEGORY_os', BASE_DIR + 'USER_CATEGORY_os_vocab.csv')
USER_CATEGORY_user_city_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_CATEGORY_user_city_id', BASE_DIR + 'USER_CATEGORY_user_city_id_vocab.csv')
USER_MULTI_CATEGORY__second_solutions_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_MULTI_CATEGORY_second_solutions', BASE_DIR + 'USER_MULTI_CATEGORY_second_solutions_vocab.csv')
USER_MULTI_CATEGORY__second_positions_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_MULTI_CATEGORY_second_positions', BASE_DIR + 'USER_MULTI_CATEGORY_second_positions_vocab.csv')
USER_MULTI_CATEGORY__second_demands_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_MULTI_CATEGORY_second_demands', BASE_DIR + 'USER_MULTI_CATEGORY_second_demands_vocab.csv')
USER_MULTI_CATEGORY__projects_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_MULTI_CATEGORY_projects', BASE_DIR + 'USER_MULTI_CATEGORY_projects_vocab.csv')
ITEM_CATEGORY_card_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_card_id', BASE_DIR + 'ITEM_CATEGORY_card_id_vocab.csv')
ITEM_CATEGORY_service_type_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_service_type', BASE_DIR + 'ITEM_CATEGORY_service_type_vocab.csv')
ITEM_CATEGORY_merchant_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_merchant_id', BASE_DIR + 'ITEM_CATEGORY_merchant_id_vocab.csv')
ITEM_CATEGORY_doctor_type_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_doctor_type', BASE_DIR + 'ITEM_CATEGORY_doctor_type_vocab.csv')
ITEM_CATEGORY_doctor_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_doctor_id', BASE_DIR + 'ITEM_CATEGORY_doctor_id_vocab.csv')
ITEM_CATEGORY_doctor_famous_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_doctor_famous', BASE_DIR + 'ITEM_CATEGORY_doctor_famous_vocab.csv')
ITEM_CATEGORY_hospital_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_hospital_id', BASE_DIR + 'ITEM_CATEGORY_hospital_id_vocab.csv')
ITEM_CATEGORY_hospital_city_tag_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_hospital_city_tag_id', BASE_DIR + 'ITEM_CATEGORY_hospital_city_tag_id_vocab.csv')
ITEM_CATEGORY_hospital_type_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_hospital_type', BASE_DIR + 'ITEM_CATEGORY_hospital_type_vocab.csv')
ITEM_CATEGORY_hospital_is_high_quality_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_hospital_is_high_quality', BASE_DIR + 'ITEM_CATEGORY_hospital_is_high_quality_vocab.csv')
ITEM_MULTI_CATEGORY__second_solutions_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_MULTI_CATEGORY_second_solutions', BASE_DIR + 'ITEM_MULTI_CATEGORY_second_solutions_vocab.csv')
ITEM_MULTI_CATEGORY__second_positions_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_MULTI_CATEGORY_second_positions', BASE_DIR + 'ITEM_MULTI_CATEGORY_second_positions_vocab.csv')
ITEM_MULTI_CATEGORY__second_demands_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_MULTI_CATEGORY_second_demands', BASE_DIR + 'ITEM_MULTI_CATEGORY_second_demands_vocab.csv')
ITEM_MULTI_CATEGORY__projects_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_MULTI_CATEGORY_projects', BASE_DIR + 'ITEM_MULTI_CATEGORY_projects_vocab.csv')
USER_CATEGORY_device_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_CATEGORY_device_id', DATA_DIR + 'USER_CATEGORY_device_id_vocab.csv')
USER_CATEGORY_os_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_CATEGORY_os', DATA_DIR + 'USER_CATEGORY_os_vocab.csv')
USER_CATEGORY_user_city_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_CATEGORY_user_city_id', DATA_DIR + 'USER_CATEGORY_user_city_id_vocab.csv')
USER_MULTI_CATEGORY__second_solutions_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_MULTI_CATEGORY_second_solutions', DATA_DIR + 'USER_MULTI_CATEGORY_second_solutions_vocab.csv')
USER_MULTI_CATEGORY__second_positions_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_MULTI_CATEGORY_second_positions', DATA_DIR + 'USER_MULTI_CATEGORY_second_positions_vocab.csv')
USER_MULTI_CATEGORY__second_demands_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_MULTI_CATEGORY_second_demands', DATA_DIR + 'USER_MULTI_CATEGORY_second_demands_vocab.csv')
USER_MULTI_CATEGORY__projects_fc = tf.feature_column.categorical_column_with_vocabulary_file('USER_MULTI_CATEGORY_projects', DATA_DIR + 'USER_MULTI_CATEGORY_projects_vocab.csv')
ITEM_CATEGORY_card_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_card_id', DATA_DIR + 'ITEM_CATEGORY_card_id_vocab.csv')
ITEM_CATEGORY_service_type_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_service_type', DATA_DIR + 'ITEM_CATEGORY_service_type_vocab.csv')
ITEM_CATEGORY_merchant_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_merchant_id', DATA_DIR + 'ITEM_CATEGORY_merchant_id_vocab.csv')
ITEM_CATEGORY_doctor_type_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_doctor_type', DATA_DIR + 'ITEM_CATEGORY_doctor_type_vocab.csv')
ITEM_CATEGORY_doctor_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_doctor_id', DATA_DIR + 'ITEM_CATEGORY_doctor_id_vocab.csv')
ITEM_CATEGORY_doctor_famous_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_doctor_famous', DATA_DIR + 'ITEM_CATEGORY_doctor_famous_vocab.csv')
ITEM_CATEGORY_hospital_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_hospital_id', DATA_DIR + 'ITEM_CATEGORY_hospital_id_vocab.csv')
ITEM_CATEGORY_hospital_city_tag_id_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_hospital_city_tag_id', DATA_DIR + 'ITEM_CATEGORY_hospital_city_tag_id_vocab.csv')
ITEM_CATEGORY_hospital_type_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_hospital_type', DATA_DIR + 'ITEM_CATEGORY_hospital_type_vocab.csv')
ITEM_CATEGORY_hospital_is_high_quality_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_CATEGORY_hospital_is_high_quality', DATA_DIR + 'ITEM_CATEGORY_hospital_is_high_quality_vocab.csv')
ITEM_MULTI_CATEGORY__second_solutions_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_MULTI_CATEGORY_second_solutions', DATA_DIR + 'ITEM_MULTI_CATEGORY_second_solutions_vocab.csv')
ITEM_MULTI_CATEGORY__second_positions_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_MULTI_CATEGORY_second_positions', DATA_DIR + 'ITEM_MULTI_CATEGORY_second_positions_vocab.csv')
ITEM_MULTI_CATEGORY__second_demands_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_MULTI_CATEGORY_second_demands', DATA_DIR + 'ITEM_MULTI_CATEGORY_second_demands_vocab.csv')
ITEM_MULTI_CATEGORY__projects_fc = tf.feature_column.categorical_column_with_vocabulary_file('ITEM_MULTI_CATEGORY_projects', DATA_DIR + 'ITEM_MULTI_CATEGORY_projects_vocab.csv')
def embedding_fc(categorical_column, dim):
return tf.feature_column.embedding_column(categorical_column, dim)
......@@ -220,7 +224,7 @@ config = tf.estimator.RunConfig(save_checkpoints_steps = 3000, train_distribute
# config = tf.estimator.RunConfig(save_checkpoints_steps = 3000, session_config = session_config)
wideAndDeepModel = tf.estimator.DNNLinearCombinedClassifier(model_dir = MODEL_BASE_DIR + 'model_csv',
wideAndDeepModel = tf.estimator.DNNLinearCombinedClassifier(model_dir = CHECKPOINTS_DIR,
linear_feature_columns = linear_feature_columns,
dnn_feature_columns = dnn_feature_columns,
dnn_hidden_units = [128, 32],
......@@ -229,11 +233,11 @@ wideAndDeepModel = tf.estimator.DNNLinearCombinedClassifier(model_dir = MODEL_BA
# early_stopping = tf.contrib.estimator.stop_if_no_decrease_hook(wideAndDeepModel, eval_dir = wideAndDeepModel.eval_dir(), metric_name='auc', max_steps_without_decrease=1000, min_steps = 100)
early_stopping = tf.estimator.experimental.stop_if_no_increase_hook(wideAndDeepModel, 'auc', 1000, eval_dir = wideAndDeepModel.eval_dir(), min_steps = 1000, run_every_secs = None, run_every_steps = 3000)
early_stopping = tf.estimator.experimental.stop_if_no_increase_hook(wideAndDeepModel, 'auc', 5, eval_dir = wideAndDeepModel.eval_dir(), min_steps = 1000, run_every_secs = None, run_every_steps = 3000)
hooks = [early_stopping]
train_spec = tf.estimator.TrainSpec(input_fn = lambda: input_fn(BASE_DIR + 'train_samples.csv', 100, True, 2048), hooks = hooks)
train_spec = tf.estimator.TrainSpec(input_fn = lambda: input_fn(DATA_DIR + 'train_samples.csv', 100, True, 2048), hooks = hooks)
serving_feature_spec = tf.feature_column.make_parse_example_spec(
linear_feature_columns + dnn_feature_columns)
......@@ -247,7 +251,7 @@ exporter = tf.estimator.BestExporter(
serving_input_receiver_fn = serving_input_receiver_fn,
exports_to_keep = 3)
eval_spec = tf.estimator.EvalSpec(input_fn = lambda: input_fn(BASE_DIR + 'eval_samples.csv', 1, False, 2 ** 15), steps = None, throttle_secs = 120, exporters = exporter)
eval_spec = tf.estimator.EvalSpec(input_fn = lambda: input_fn(DATA_DIR + 'eval_samples.csv', 1, False, 2 ** 15), steps = None, throttle_secs = 120, exporters = exporter)
# def my_auc(labels, predictions):
# return {'auc_pr_careful_interpolation': tf.metrics.auc(labels, predictions['logistic'], curve='ROC',
......@@ -257,6 +261,17 @@ eval_spec = tf.estimator.EvalSpec(input_fn = lambda: input_fn(BASE_DIR + 'eval_s
tf.estimator.train_and_evaluate(wideAndDeepModel, train_spec, eval_spec)
wideAndDeepModel.evaluate(lambda: input_fn(BASE_DIR + 'eval_samples.csv', 1, False, 2 ** 15))
wideAndDeepModel.evaluate(lambda: input_fn(DATA_DIR + 'eval_samples.csv', 1, False, 2 ** 15))
wideAndDeepModel.export_saved_model(MODEL_BASE_DIR , serving_input_receiver_fn, as_text = False)
filenames = os.listdir(MODEL_BASE_DIR)
filename = filenames[0]
from datetime import date
filename_ = date.fromtimestamp(int(filename)).strftime('%Y%m%d')
os.remove(os.path.join(MODEL_BASE_DIR, filename), os.path.join(MODEL_BASE_DIR, filename_))
print("训练耗时: {}s".format(time.time() - start))
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment