Commit f84b3822 authored by 张彦钊's avatar 张彦钊

esm pyspark 代码重构

parent dccbc937
......@@ -206,7 +206,7 @@ def feature_engineer():
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
train = rdd.filter(lambda x: x[0] != validate_date).map(
train = rdd.map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13]))
f = time.time()
......
#! /bin/bash
git checkout master
PYTHON_PATH=/opt/anaconda3/envs/esmm/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm/Model_pipline
DATA_PATH=/home/gmuser/esmm
echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/*
rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/20*
echo "data"
${PYTHON_PATH} ${MODEL_PATH}/feature.py > ${DATA_PATH}/feature.log
echo "csv to tfrecord"
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/tr/ --output_dir=${DATA_PATH}/tr/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/va/ --output_dir=${DATA_PATH}/va/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/native/ --output_dir=${DATA_PATH}/native/
${PYTHON_PATH} ${MODEL_PATH}/to_tfrecord.py --input_dir=${DATA_PATH}/nearby/ --output_dir=${DATA_PATH}/nearby/
cat ${DATA_PATH}/tr/*.tfrecord > ${DATA_PATH}/tr/tr.tfrecord
cat ${DATA_PATH}/va/*.tfrecord > ${DATA_PATH}/va/va.tfrecord
cat ${DATA_PATH}/native/*.tfrecord > ${DATA_PATH}/native/native.tfrecord
cat ${DATA_PATH}/nearby/*.tfrecord > ${DATA_PATH}/nearby/nearby.tfrecord
rm ${DATA_PATH}/tr/tr_*
rm ${DATA_PATH}/va/va_*
rm ${DATA_PATH}/native/native_*
rm ${DATA_PATH}/nearby/nearby_*
PYTHON_PATH=/srv/envs/esmm/bin/python
MODEL_PATH=/srv/apps/ffm-baseline_git/eda/esmm/Model_pipline
LOCAL_PATH=/home/gmuser/esmm
HDFS_PATH=hdfs://172.16.32.4:8020/strategy/esmm
echo "rm model file"
rm -r ${LOCAL_PATH}/model_ckpt/DeepCvrMTL/20*
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH} --task_type=train
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
\ No newline at end of file
......@@ -81,7 +81,6 @@ def main():
tmp = str(to_delete[start:end]).strip('[]')
df_merge_str.append(tmp)
try:
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
......@@ -91,11 +90,24 @@ def main():
print("delete done")
con.close()
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
df_all.to_sql('esmm_device_diary_queue', con=engine, if_exists='append', index=False, chunksize=8000)
print("insert done")
except Exception as e:
print(e)
# try:
# for i in df_merge_str:
# delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
# con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
# cur = con.cursor()
# cur.execute(delete_str)
# con.commit()
# print("delete done")
# con.close()
# engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
# df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
# print("insert done")
#
# except Exception as e:
# print(e)
if __name__ == '__main__':
......
......@@ -8,11 +8,9 @@
import shutil
import os
import json
import glob
from datetime import date, timedelta
import random
import tensorflow as tf
import subprocess
#################### CMD Arguments ####################
FLAGS = tf.app.flags.FLAGS
......@@ -37,7 +35,8 @@ tf.app.flags.DEFINE_string("deep_layers", '256,128,64', "deep layers")
tf.app.flags.DEFINE_string("dropout", '0.5,0.5,0.5', "dropout rate")
tf.app.flags.DEFINE_boolean("batch_norm", False, "perform batch normaization (True or False)")
tf.app.flags.DEFINE_float("batch_norm_decay", 0.9, "decay for the moving average(recommend trying decay=0.9)")
tf.app.flags.DEFINE_string("data_dir", '', "data dir")
tf.app.flags.DEFINE_string("hdfs_dir", '', "hdfs dir")
tf.app.flags.DEFINE_string("local_dir", '', "local dir")
tf.app.flags.DEFINE_string("dt_dir", '', "data dt partition")
tf.app.flags.DEFINE_string("model_dir", '', "model check point dir")
tf.app.flags.DEFINE_string("servable_model_dir", '', "export servable model for TensorFlow Serving")
......@@ -63,7 +62,6 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
......@@ -137,8 +135,8 @@ def model_fn(features, labels, mode, params):
tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum")
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id,shape=[-1,common_dims]),app_id,level2,level3,tag1,
tag2,tag3,tag4,tag5,tag6,tag7], axis=1)
x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1,
tag2, tag3, tag4, tag5, tag6, tag7], axis=1)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
......@@ -301,7 +299,8 @@ def main(_):
print('task_type ', FLAGS.task_type)
print('model_dir ', FLAGS.model_dir)
print('data_dir ', FLAGS.data_dir)
print('hdfs_dir ', FLAGS.hdfs_dir)
print('local_dir ', FLAGS.local_dir)
print('dt_dir ', FLAGS.dt_dir)
print('num_epochs ', FLAGS.num_epochs)
print('feature_size ', FLAGS.feature_size)
......@@ -317,13 +316,13 @@ def main(_):
print('ctr_task_wgt ', FLAGS.ctr_task_wgt)
#------init Envs------
tr_files = glob.glob("%s/tr/*tfrecord" % FLAGS.data_dir)
random.shuffle(tr_files)
print("tr_files:", tr_files)
va_files = glob.glob("%s/va/*tfrecord" % FLAGS.data_dir)
print("va_files:", va_files)
te_files = glob.glob("%s/*tfrecord" % FLAGS.data_dir)
print("te_files:", te_files)
# tr_files = [path+"tr/part-r-00000"]
# va_files = [path+"va/part-r-00000"]
# te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
tr_files = get_filename("/strategy/esmm/tr")
va_files = get_filename("/strategy/esmm/va")
te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
if FLAGS.clear_existing_model:
try:
......@@ -362,14 +361,33 @@ def main(_):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'infer':
preds = Estimator.predict(input_fn=lambda: input_fn(te_files, num_epochs=1, batch_size=FLAGS.batch_size), predict_keys=["pctcvr","pctr","pcvr"])
with open(FLAGS.data_dir+"/pred.txt", "w") as fo:
with open(FLAGS.local_dir+"/pred.txt", "w") as fo:
print("-"*100)
with open(FLAGS.data_dir + "/pred.txt", "w") as fo:
with open(FLAGS.local_dir + "/pred.txt", "w") as fo:
for prob in preds:
fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
def get_filename(dir_in):
pre_path = "hdfs://172.16.32.4:8020"
args = "hdfs dfs -ls " + dir_in + " | awk '{print $8}'"
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
s_output, s_err = proc.communicate()
all_dart_dirs = s_output.split()
a = []
for i in all_dart_dirs:
b = str(i).split("/")[4]
if b[:4] == "part":
tmp = pre_path + str(i)[2:-1]
a.append(tmp)
return a
if __name__ == "__main__":
# classpath = "$CLASSPATH:%JAVA_HOME%/lib/dt.jar:%JAVA_HOME%/lib/tools.jar:$(/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/opt/hadoop/share/hadoop/common/lib/hadoop-annotations-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/lib/activation-1.1.jar:/opt/hadoop/share/hadoop/common/lib/commons-codec-1.4.jar:/opt/hadoop/share/hadoop/common/lib/jasper-runtime-5.5.23.jar:/opt/hadoop/share/hadoop/common/lib/jsch-0.1.42.jar:/opt/hadoop/share/hadoop/common/lib/hadoop-auth-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/lib/jsp-api-2.1.jar:/opt/hadoop/share/hadoop/common/lib/asm-3.2.jar:/opt/hadoop/share/hadoop/common/lib/commons-lang-2.6.jar:/opt/hadoop/share/hadoop/common/lib/commons-beanutils-1.9.2.jar:/opt/hadoop/share/hadoop/common/lib/zookeeper-3.4.5-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/opt/hadoop/share/hadoop/common/lib/jetty-util-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar:/opt/hadoop/share/hadoop/common/lib/guava-11.0.2.jar:/opt/hadoop/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/opt/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:/opt/hadoop/share/hadoop/common/lib/servlet-api-2.5.jar:/opt/hadoop/share/hadoop/common/lib/commons-collections-3.2.2.jar:/opt/hadoop/share/hadoop/common/lib/jersey-server-1.9.jar:/opt/hadoop/share/hadoop/common/lib/commons-digester-1.8.jar:/opt/hadoop/share/hadoop/common/lib/jasper-compiler-5.5.23.jar:/opt/hadoop/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/opt/hadoop/share/hadoop/common/lib/curator-client-2.7.1.jar:/opt/hadoop/share/hadoop/common/lib/commons-logging-1.1.3.jar:/opt/hadoop/share/hadoop/common/lib/jackson-jaxrs-1.8.10.jar:/opt/hadoop/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/opt/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar:/opt/hadoop/share/hadoop/common/lib/gson-2.2.4.jar:/opt/hadoop/share/hadoop/common/lib/commons-configuration-1.6.jar:/opt/hadoop/share/hadoop/common/lib/commons-httpclient-3.1.jar:/opt/hadoop/share/hadoop/common/lib/hamcrest-core-1.3.jar:/opt/hadoop/share/hadoop/common/lib/httpclient-4.2.5.jar:/opt/hadoop/share/hadoop/common/lib/jets3t-0.9.0.jar:/opt/hadoop/share/hadoop/common/lib/xmlenc-0.52.jar:/opt/hadoop/share/hadoop/common/lib/logredactor-1.0.3.jar:/opt/hadoop/share/hadoop/common/lib/slf4j-api-1.7.5.jar:/opt/hadoop/share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar:/opt/hadoop/share/hadoop/common/lib/curator-recipes-2.7.1.jar:/opt/hadoop/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/opt/hadoop/share/hadoop/common/lib/jsr305-3.0.0.jar:/opt/hadoop/share/hadoop/common/lib/log4j-1.2.17.jar:/opt/hadoop/share/hadoop/common/lib/xz-1.0.jar:/opt/hadoop/share/hadoop/common/lib/junit-4.11.jar:/opt/hadoop/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/opt/hadoop/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/opt/hadoop/share/hadoop/common/lib/commons-compress-1.4.1.jar:/opt/hadoop/share/hadoop/common/lib/commons-net-3.1.jar:/opt/hadoop/share/hadoop/common/lib/jersey-json-1.9.jar:/opt/hadoop/share/hadoop/common/lib/stax-api-1.0-2.jar:/opt/hadoop/share/hadoop/common/lib/commons-el-1.0.jar:/opt/hadoop/share/hadoop/common/lib/mockito-all-1.8.5.jar:/opt/hadoop/share/hadoop/common/lib/jetty-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/common/lib/jettison-1.1.jar:/opt/hadoop/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/opt/hadoop/share/hadoop/common/lib/avro-1.7.6-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/lib/httpcore-4.2.5.jar:/opt/hadoop/share/hadoop/common/lib/commons-io-2.4.jar:/opt/hadoop/share/hadoop/common/lib/netty-3.10.5.Final.jar:/opt/hadoop/share/hadoop/common/lib/paranamer-2.3.jar:/opt/hadoop/share/hadoop/common/lib/curator-framework-2.7.1.jar:/opt/hadoop/share/hadoop/common/lib/jackson-xc-1.8.10.jar:/opt/hadoop/share/hadoop/common/lib/commons-math3-3.1.1.jar:/opt/hadoop/share/hadoop/common/lib/jersey-core-1.9.jar:/opt/hadoop/share/hadoop/common/hadoop-common-2.6.0-cdh5.16.1-tests.jar:/opt/hadoop/share/hadoop/common/hadoop-nfs-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/hadoop-common-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/commons-codec-1.4.jar:/opt/hadoop/share/hadoop/hdfs/lib/jasper-runtime-5.5.23.jar:/opt/hadoop/share/hadoop/hdfs/lib/jsp-api-2.1.jar:/opt/hadoop/share/hadoop/hdfs/lib/asm-3.2.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/opt/hadoop/share/hadoop/hdfs/lib/jetty-util-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/hdfs/lib/guava-11.0.2.jar:/opt/hadoop/share/hadoop/hdfs/lib/xml-apis-1.3.04.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/opt/hadoop/share/hadoop/hdfs/lib/servlet-api-2.5.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/opt/hadoop/share/hadoop/hdfs/lib/jersey-server-1.9.jar:/opt/hadoop/share/hadoop/hdfs/lib/jackson-core-asl-1.8.10.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/opt/hadoop/share/hadoop/hdfs/lib/jackson-mapper-asl-1.8.10-cloudera.1.jar:/opt/hadoop/share/hadoop/hdfs/lib/xmlenc-0.52.jar:/opt/hadoop/share/hadoop/hdfs/lib/htrace-core4-4.0.1-incubating.jar:/opt/hadoop/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/opt/hadoop/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/opt/hadoop/share/hadoop/hdfs/lib/xercesImpl-2.9.1.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-el-1.0.jar:/opt/hadoop/share/hadoop/hdfs/lib/jetty-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-io-2.4.jar:/opt/hadoop/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/opt/hadoop/share/hadoop/hdfs/lib/netty-3.10.5.Final.jar:/opt/hadoop/share/hadoop/hdfs/lib/jersey-core-1.9.jar:/opt/hadoop/share/hadoop/hdfs/hadoop-hdfs-nfs-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.16.1-tests.jar:/opt/hadoop/share/hadoop/yarn/lib/activation-1.1.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-codec-1.4.jar:/opt/hadoop/share/hadoop/yarn/lib/aopalliance-1.0.jar:/opt/hadoop/share/hadoop/yarn/lib/asm-3.2.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-lang-2.6.jar:/opt/hadoop/share/hadoop/yarn/lib/zookeeper-3.4.5-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/lib/guice-3.0.jar:/opt/hadoop/share/hadoop/yarn/lib/jetty-util-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/yarn/lib/guava-11.0.2.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-cli-1.2.jar:/opt/hadoop/share/hadoop/yarn/lib/servlet-api-2.5.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-collections-3.2.2.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-server-1.9.jar:/opt/hadoop/share/hadoop/yarn/lib/jackson-core-asl-1.8.10.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-logging-1.1.3.jar:/opt/hadoop/share/hadoop/yarn/lib/jackson-jaxrs-1.8.10.jar:/opt/hadoop/share/hadoop/yarn/lib/jaxb-impl-2.2.3-1.jar:/opt/hadoop/share/hadoop/yarn/lib/jline-2.11.jar:/opt/hadoop/share/hadoop/yarn/lib/jackson-mapper-asl-1.8.10-cloudera.1.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-guice-1.9.jar:/opt/hadoop/share/hadoop/yarn/lib/jsr305-3.0.0.jar:/opt/hadoop/share/hadoop/yarn/lib/log4j-1.2.17.jar:/opt/hadoop/share/hadoop/yarn/lib/xz-1.0.jar:/opt/hadoop/share/hadoop/yarn/lib/javax.inject-1.jar:/opt/hadoop/share/hadoop/yarn/lib/jaxb-api-2.2.2.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-compress-1.4.1.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-json-1.9.jar:/opt/hadoop/share/hadoop/yarn/lib/stax-api-1.0-2.jar:/opt/hadoop/share/hadoop/yarn/lib/jetty-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/yarn/lib/jettison-1.1.jar:/opt/hadoop/share/hadoop/yarn/lib/protobuf-java-2.5.0.jar:/opt/hadoop/share/hadoop/yarn/lib/guice-servlet-3.0.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-io-2.4.jar:/opt/hadoop/share/hadoop/yarn/lib/leveldbjni-all-1.8.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-client-1.9.jar:/opt/hadoop/share/hadoop/yarn/lib/jackson-xc-1.8.10.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-core-1.9.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-client-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-common-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-tests-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-api-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-registry-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-common-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-annotations-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/aopalliance-1.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/asm-3.2.jar:/opt/hadoop/share/hadoop/mapreduce/lib/guice-3.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/snappy-java-1.0.4.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jersey-server-1.9.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jackson-core-asl-1.8.10.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jackson-mapper-asl-1.8.10-cloudera.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/hamcrest-core-1.3.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jersey-guice-1.9.jar:/opt/hadoop/share/hadoop/mapreduce/lib/log4j-1.2.17.jar:/opt/hadoop/share/hadoop/mapreduce/lib/xz-1.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/junit-4.11.jar:/opt/hadoop/share/hadoop/mapreduce/lib/javax.inject-1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/commons-compress-1.4.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/protobuf-java-2.5.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/avro-1.7.6-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/guice-servlet-3.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/commons-io-2.4.jar:/opt/hadoop/share/hadoop/mapreduce/lib/leveldbjni-all-1.8.jar:/opt/hadoop/share/hadoop/mapreduce/lib/netty-3.10.5.Final.jar:/opt/hadoop/share/hadoop/mapreduce/lib/paranamer-2.3.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jersey-core-1.9.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-hs-plugins-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-app-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-shuffle-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-nativetask-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.1-tests.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-hs-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.1.jar)"
# w
a = "export CLASSPATH='$(hadoop classpath --glob)'"
os.system(a)
path = "hdfs://172.16.32.4:8020/strategy/esmm/"
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
import pandas as pd
import pymysql
import datetime
def con_sql(db,sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
db.close()
return df
def multi_hot(df,column,n):
df[column] = df[column].fillna("lost_na")
app_list_value = [i.split(",") for i in df[column].unique()]
app_list_unique = []
for i in app_list_value:
app_list_unique.extend(i)
app_list_unique = list(set(app_list_unique))
number = len(app_list_unique)
app_list_map = dict(zip(app_list_unique, list(range(n, number + n))))
df[column] = df[column].apply(app_list_func, args=(app_list_map,))
return number,app_list_map
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from {}".format(train_data_set)
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=10)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,e.device_id,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids,feat.level2 " \
"from {} e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id " \
"where e.stat_date >= '{}'".format(train_data_set,start)
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id",
11: "time",12:"app_list",13:"service_id",14:"level3_ids",15:"level2"})
print("esmm data ok")
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \
"from train_Knowledge_network_data"
knowledge = con_sql(db, sql)
knowledge = knowledge.rename(columns={0: "level2", 1: "method", 2: "min", 3: "max",
4: "treatment_time", 5: "maintain_time",6: "recover_time"})
knowledge["level2"] = knowledge["level2"].astype("str")
df = pd.merge(df, knowledge, on='level2', how='left')
df = df.drop("level2", axis=1)
service_id = tuple(df["service_id"].unique())
db = pymysql.connect(host='rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com', port=3306, user='work',
passwd='BJQaT9VzDcuPBqkd', db='zhengxing')
sql = "select s.id,d.hospital_id from api_service s left join api_doctor d on s.doctor_id = d.id " \
"where s.id in {}".format(service_id)
hospital = con_sql(db, sql)
hospital = hospital.rename(columns={0: "service_id", 1: "hospital_id"})
# print(hospital.head())
# print("hospital")
# print(hospital.count())
hospital["service_id"] = hospital["service_id"].astype("str")
df = pd.merge(df, hospital, on='service_id', how='left')
df = df.drop("service_id", axis=1)
print(df.count())
print("before")
print(df.shape)
df = df.drop_duplicates(["ucity_id", "clevel2_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
print("去重后样本数量:",df.shape)
app_list_number,app_list_map = multi_hot(df,"app_list",2)
level2_number,level2_map = multi_hot(df,"clevel2_id",2+app_list_number)
level3_number,level3_map = multi_hot(df, "level3_ids", 2 + app_list_number + level2_number)
unique_values = []
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date","hospital_id",
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
for i in features:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
# 下面这行代码是为了区分不同的列中有相同的值
df[i] = df[i] + i
unique_values.extend(list(df[i].unique()))
temp = list(range(2+app_list_number+level2_number + level3_number, 2 + app_list_number+level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values,temp))
df = df.drop("device_id", axis=1)
train = df[df["stat_date"] != validate_date+"stat_date"]
test = df[df["stat_date"] == validate_date+"stat_date"]
for i in ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date","hospital_id",
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]:
train[i] = train[i].map(value_map)
test[i] = test[i].map(value_map)
print("train shape")
print(train.shape)
print("test shape")
print(test.shape)
write_csv(train, "tr",100000)
write_csv(test, "va",80000)
return validate_date,value_map,app_list_map,level2_map,level3_map
def app_list_func(x,l):
b = x.split(",")
e = []
for i in b:
if i in l.keys():
e.append(l[i])
else:
e.append(0)
return ",".join([str(j) for j in e])
def write_csv(df,name,n):
for i in range(0, df.shape[0], n):
if i == 0:
temp = df.iloc[0:n]
elif i + n > df.shape[0]:
temp = df.iloc[i:]
else:
temp = df.iloc[i:i + n]
temp.to_csv(path + name+ "/{}_{}.csv".format(name,i), index=False)
def get_predict(date,value_map,app_list_map,level2_map,level3_map):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids,feat.level2 " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",10: "device_id",
11: "cid_id", 12: "time",13:"app_list",14:"hospital_id",15:"level3_ids",
16: "level2"})
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \
"from train_Knowledge_network_data"
knowledge = con_sql(db, sql)
knowledge = knowledge.rename(columns={0: "level2", 1: "method", 2: "min", 3: "max",
4: "treatment_time", 5: "maintain_time", 6: "recover_time"})
knowledge["level2"] = knowledge["level2"].astype("str")
df = pd.merge(df, knowledge, on='level2', how='left')
df = df.drop("level2", axis=1)
df = df.drop_duplicates(["ucity_id", "clevel2_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "app_list", "hospital_id", "level3_ids"])
df["stat_date"] = date
print(df.head(6))
df["app_list"] = df["app_list"].fillna("lost_na")
df["app_list"] = df["app_list"].apply(app_list_func,args=(app_list_map,))
df["clevel2_id"] = df["clevel2_id"].fillna("lost_na")
df["clevel2_id"] = df["clevel2_id"].apply(app_list_func, args=(level2_map,))
df["level3_ids"] = df["level3_ids"].fillna("lost_na")
df["level3_ids"] = df["level3_ids"].apply(app_list_func, args=(level3_map,))
# print("predict shape")
# print(df.shape)
df["uid"] = df["device_id"]
df["city"] = df["ucity_id"]
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date","hospital_id",
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
for i in features:
df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost")
df[i] = df[i] + i
native_pre = df[df["label"] == 0]
native_pre = native_pre.drop("label", axis=1)
nearby_pre = df[df["label"] == 1]
nearby_pre = nearby_pre.drop("label", axis=1)
for i in ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date","hospital_id",
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]:
native_pre[i] = native_pre[i].map(value_map)
# TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
native_pre[i] = native_pre[i].fillna(0)
nearby_pre[i] = nearby_pre[i].map(value_map)
# TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
nearby_pre[i] = nearby_pre[i].fillna(0)
print("native")
print(native_pre.shape)
native_pre[["uid","city","cid_id"]].to_csv(path+"native.csv",index=False)
write_csv(native_pre, "native",200000)
print("nearby")
print(nearby_pre.shape)
nearby_pre[["uid","city","cid_id"]].to_csv(path+"nearby.csv",index=False)
write_csv(nearby_pre, "nearby", 160000)
if __name__ == '__main__':
train_data_set = "esmm_train_data"
path = "/data/esmm/"
date,value,app_list,level2,level3 = get_data()
get_predict(date, value,app_list,level2,level3)
#coding=utf-8
from sqlalchemy import create_engine
import pandas as pd
import pymysql
import time
def con_sql(sql):
"""
:type sql : str
:rtype : tuple
"""
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def nearby_set_join(lst):
# return ','.join([str(i) for i in list(lst)])
return ','.join([str(i) for i in lst.unique().tolist()])
def native_set_join(lst):
l = lst.unique().tolist()
d = int(len(l)/2)
if d == 0:
d = 1
r = [str(i) for i in l]
r =r[:d]
return ','.join(r)
def main():
# native queue
df2 = pd.read_csv('/data/esmm/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/data/esmm/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
print("native_device_count",df3.shape)
# nearby queue
df2 = pd.read_csv('/data/esmm/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/data/esmm/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
df_all['device_id'] = df_all['device_id'].astype(str)
df_all['city_id'] = df_all['city_id'].astype(str)
ctime = int(time.time())
df_all["time"] = ctime
print("union_device_count",df_all.shape)
host='10.66.157.22'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
db='jerry_test'
charset='utf8'
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
try:
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
except Exception as e:
print(e)
print("done")
if __name__ == '__main__':
main()
\ No newline at end of file
#coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import os
import glob
import tensorflow as tf
import numpy as np
from multiprocessing import Pool as ThreadPool
flags = tf.app.flags
FLAGS = flags.FLAGS
LOG = tf.logging
tf.app.flags.DEFINE_string("input_dir", "./", "input dir")
tf.app.flags.DEFINE_string("output_dir", "./", "output dir")
tf.app.flags.DEFINE_integer("threads", 16, "threads num")
def gen_tfrecords(in_file):
basename = os.path.basename(in_file) + ".tfrecord"
out_file = os.path.join(FLAGS.output_dir, basename)
tfrecord_out = tf.python_io.TFRecordWriter(out_file)
df = pd.read_csv(in_file)
for i in range(df.shape[0]):
feats = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date","hospital_id",
"method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list":tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int)))
})
example = tf.train.Example(features = features)
serialized = example.SerializeToString()
tfrecord_out.write(serialized)
tfrecord_out.close()
def main(_):
if not os.path.exists(FLAGS.output_dir):
os.mkdir(FLAGS.output_dir)
file_list = glob.glob(os.path.join(FLAGS.input_dir, "*.csv"))
print("total files: %d" % len(file_list))
pool = ThreadPool(FLAGS.threads) # Sets the pool size
pool.map(gen_tfrecords, file_list)
pool.close()
pool.join()
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
#! /bin/bash
git checkout master
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/tensnsorflow/es
DATA_PATH=/data/esmm
PYTHON_PATH=/opt/anaconda3/envs/esmm/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm/Model_pipline
DATA_PATH=/home/gmuser/esmm
echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/*
......@@ -32,15 +32,13 @@ rm ${DATA_PATH}/nearby/nearby_*
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/native_infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/nearby_infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=300000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py > ${DATA_PATH}/insert_database.log
......@@ -81,6 +81,7 @@ def main():
tmp = str(to_delete[start:end]).strip('[]')
df_merge_str.append(tmp)
try:
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
......@@ -90,24 +91,11 @@ def main():
print("delete done")
con.close()
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue', con=engine, if_exists='append', index=False, chunksize=8000)
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
print("insert done")
# try:
# for i in df_merge_str:
# delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
# con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
# cur = con.cursor()
# cur.execute(delete_str)
# con.commit()
# print("delete done")
# con.close()
# engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
# df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
# print("insert done")
#
# except Exception as e:
# print(e)
except Exception as e:
print(e)
if __name__ == '__main__':
......
......@@ -6,7 +6,6 @@
#import argparse
import shutil
#import sys
import os
import json
import glob
......@@ -55,7 +54,14 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
"ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64),
"level3_list": tf.VarLenFeature(tf.int64)
"level3_list": tf.VarLenFeature(tf.int64),
"tag1_list": tf.VarLenFeature(tf.int64),
"tag2_list": tf.VarLenFeature(tf.int64),
"tag3_list": tf.VarLenFeature(tf.int64),
"tag4_list": tf.VarLenFeature(tf.int64),
"tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
......@@ -103,6 +109,14 @@ def model_fn(features, labels, mode, params):
feat_ids = features['ids']
app_list = features['app_list']
level2_list = features['level2_list']
level3_list = features['level3_list']
tag1_list = features['tag1_list']
tag2_list = features['tag2_list']
tag3_list = features['tag3_list']
tag4_list = features['tag4_list']
tag5_list = features['tag5_list']
tag6_list = features['tag6_list']
tag7_list = features['tag7_list']
if FLAGS.task_type != "infer":
y = labels['y']
......@@ -113,10 +127,18 @@ def model_fn(features, labels, mode, params):
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
level2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level2_list, sp_weights=None, combiner="sum")
level3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level3_list, sp_weights=None, combiner="sum")
tag1 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag1_list, sp_weights=None, combiner="sum")
tag2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag2_list, sp_weights=None, combiner="sum")
tag3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag3_list, sp_weights=None, combiner="sum")
tag4 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag4_list, sp_weights=None, combiner="sum")
tag5 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag5_list, sp_weights=None, combiner="sum")
tag6 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag6_list, sp_weights=None, combiner="sum")
tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum")
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id,shape=[-1,common_dims]),app_id,level2], axis=1)
x_concat = tf.concat([tf.reshape(embedding_id,shape=[-1,common_dims]),app_id,level2,level3,tag1,
tag2,tag3,tag4,tag5,tag6,tag7], axis=1)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
......
#! /bin/bash
git checkout master
PYTHON_PATH=/srv/envs/esmm/bin/python
MODEL_PATH=/srv/apps/ffm-baseline_git/tensnsorflow
LOCAL_PATH=/home/gmuser/esmm
HDFS_PATH=hdfs://172.16.32.4:8020/strategy/esmm
echo "train..."
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train_multi.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH} --task_type=train
echo "infer native..."
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train_multi.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer
echo "infer nearby..."
CLASSPATH="$(hadoop classpath --glob)" ${PYTHON_PATH} ${MODEL_PATH}/train_multi.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
\ No newline at end of file
#coding=utf-8
#from __future__ import absolute_import
#from __future__ import division
#from __future__ import print_function
#import argparse
import shutil
import os
import json
from datetime import date, timedelta
import tensorflow as tf
import subprocess
#################### CMD Arguments ####################
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_integer("dist_mode", 0, "distribuion mode {0-loacal, 1-single_dist, 2-multi_dist}")
tf.app.flags.DEFINE_string("ps_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", '', "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", '', "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("num_threads", 16, "Number of threads")
tf.app.flags.DEFINE_integer("feature_size", 0, "Number of features")
tf.app.flags.DEFINE_integer("field_size", 0, "Number of common fields")
tf.app.flags.DEFINE_integer("embedding_size", 32, "Embedding size")
tf.app.flags.DEFINE_integer("num_epochs", 10, "Number of epochs")
tf.app.flags.DEFINE_integer("batch_size", 64, "Number of batch size")
tf.app.flags.DEFINE_integer("log_steps", 1000, "save summary every steps")
tf.app.flags.DEFINE_float("learning_rate", 0.0005, "learning rate")
tf.app.flags.DEFINE_float("l2_reg", 0.0001, "L2 regularization")
tf.app.flags.DEFINE_string("loss_type", 'log_loss', "loss type {square_loss, log_loss}")
tf.app.flags.DEFINE_float("ctr_task_wgt", 0.5, "loss weight of ctr task")
tf.app.flags.DEFINE_string("optimizer", 'Adam', "optimizer type {Adam, Adagrad, GD, Momentum}")
tf.app.flags.DEFINE_string("deep_layers", '256,128,64', "deep layers")
tf.app.flags.DEFINE_string("dropout", '0.5,0.5,0.5', "dropout rate")
tf.app.flags.DEFINE_boolean("batch_norm", False, "perform batch normaization (True or False)")
tf.app.flags.DEFINE_float("batch_norm_decay", 0.9, "decay for the moving average(recommend trying decay=0.9)")
tf.app.flags.DEFINE_string("hdfs_dir", '', "hdfs dir")
tf.app.flags.DEFINE_string("local_dir", '', "local dir")
tf.app.flags.DEFINE_string("dt_dir", '', "data dt partition")
tf.app.flags.DEFINE_string("model_dir", '', "model check point dir")
tf.app.flags.DEFINE_string("servable_model_dir", '', "export servable model for TensorFlow Serving")
tf.app.flags.DEFINE_string("task_type", 'train', "task type {train, infer, eval, export}")
tf.app.flags.DEFINE_boolean("clear_existing_model", False, "clear existing model or not")
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
def _parse_fn(record):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64),
"level3_list": tf.VarLenFeature(tf.int64),
"tag1_list": tf.VarLenFeature(tf.int64),
"tag2_list": tf.VarLenFeature(tf.int64),
"tag3_list": tf.VarLenFeature(tf.int64),
"tag4_list": tf.VarLenFeature(tf.int64),
"tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
field_size = params["field_size"]
feature_size = params["feature_size"]
embedding_size = params["embedding_size"]
l2_reg = params["l2_reg"]
learning_rate = params["learning_rate"]
#optimizer = params["optimizer"]
layers = list(map(int, params["deep_layers"].split(',')))
dropout = list(map(float, params["dropout"].split(',')))
ctr_task_wgt = params["ctr_task_wgt"]
common_dims = field_size*embedding_size
#------bulid weights------
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
feat_ids = features['ids']
app_list = features['app_list']
level2_list = features['level2_list']
level3_list = features['level3_list']
tag1_list = features['tag1_list']
tag2_list = features['tag2_list']
tag3_list = features['tag3_list']
tag4_list = features['tag4_list']
tag5_list = features['tag5_list']
tag6_list = features['tag6_list']
tag7_list = features['tag7_list']
if FLAGS.task_type != "infer":
y = labels['y']
z = labels['z']
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
level2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level2_list, sp_weights=None, combiner="sum")
level3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level3_list, sp_weights=None, combiner="sum")
tag1 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag1_list, sp_weights=None, combiner="sum")
tag2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag2_list, sp_weights=None, combiner="sum")
tag3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag3_list, sp_weights=None, combiner="sum")
tag4 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag4_list, sp_weights=None, combiner="sum")
tag5 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag5_list, sp_weights=None, combiner="sum")
tag6 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag6_list, sp_weights=None, combiner="sum")
tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum")
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1,
tag2, tag3, tag4, tag5, tag6, tag7], axis=1)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_cvr = x_concat
for i in range(len(layers)):
x_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_mlp%d' % i)
if FLAGS.batch_norm:
x_cvr = batch_norm_layer(x_cvr, train_phase=train_phase, scope_bn='cvr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_cvr = tf.nn.dropout(x_cvr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_out')
y_cvr = tf.reshape(y_cvr,shape=[-1])
with tf.name_scope("CTR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_ctr = x_concat
for i in range(len(layers)):
x_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_mlp%d' % i)
if FLAGS.batch_norm:
x_ctr = batch_norm_layer(x_ctr, train_phase=train_phase, scope_bn='ctr_bn_%d' %i) #放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if mode == tf.estimator.ModeKeys.TRAIN:
x_ctr = tf.nn.dropout(x_ctr, keep_prob=dropout[i]) #Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_out')
y_ctr = tf.reshape(y_ctr,shape=[-1])
with tf.variable_scope("MTL-Layer"):
pctr = tf.sigmoid(y_ctr)
pcvr = tf.sigmoid(y_cvr)
pctcvr = pctr*pcvr
predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
export_outputs=export_outputs)
if FLAGS.task_type != "infer":
#------bulid loss------
ctr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctr, labels=y))
#cvr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctcvr, labels=z))
cvr_loss = tf.reduce_mean(tf.losses.log_loss(predictions=pctcvr, labels=z))
loss = ctr_task_wgt * ctr_loss + (1 -ctr_task_wgt) * cvr_loss + l2_reg * tf.nn.l2_loss(Feat_Emb)
tf.summary.scalar('ctr_loss', ctr_loss)
tf.summary.scalar('cvr_loss', cvr_loss)
# Provide an estimator spec for `ModeKeys.EVAL`
eval_metric_ops = {
"CTR_AUC": tf.metrics.auc(y, pctr),
#"CTR_F1": tf.contrib.metrics.f1_score(y,pctr),
#"CTR_Precision": tf.metrics.precision(y,pctr),
#"CTR_Recall": tf.metrics.recall(y,pctr),
"CVR_AUC": tf.metrics.auc(z, pcvr),
"CTCVR_AUC": tf.metrics.auc(z, pctcvr)
}
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
eval_metric_ops=eval_metric_ops)
#------bulid optimizer------
if FLAGS.optimizer == 'Adam':
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8)
elif FLAGS.optimizer == 'Adagrad':
optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate, initial_accumulator_value=1e-8)
elif FLAGS.optimizer == 'Momentum':
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.95)
elif FLAGS.optimizer == 'ftrl':
optimizer = tf.train.FtrlOptimizer(learning_rate)
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
# Provide an estimator spec for `ModeKeys.TRAIN` modes
if mode == tf.estimator.ModeKeys.TRAIN:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
train_op=train_op)
def batch_norm_layer(x, train_phase, scope_bn):
bn_train = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=True, reuse=None, scope=scope_bn)
bn_infer = tf.contrib.layers.batch_norm(x, decay=FLAGS.batch_norm_decay, center=True, scale=True, updates_collections=None, is_training=False, reuse=True, scope=scope_bn)
z = tf.cond(tf.cast(train_phase, tf.bool), lambda: bn_train, lambda: bn_infer)
return z
def set_dist_env():
if FLAGS.dist_mode == 1: # 本地分布式测试模式1 chief, 1 ps, 1 evaluator
ps_hosts = FLAGS.ps_hosts.split(',')
chief_hosts = FLAGS.chief_hosts.split(',')
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# 无worker参数
tf_config = {
'cluster': {'chief': chief_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
elif FLAGS.dist_mode == 2: # 集群分布式模式
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
chief_hosts = worker_hosts[0:1] # get first worker as chief
worker_hosts = worker_hosts[2:] # the rest as worker
task_index = FLAGS.task_index
job_name = FLAGS.job_name
print('ps_host', ps_hosts)
print('worker_host', worker_hosts)
print('chief_hosts', chief_hosts)
print('job_name', job_name)
print('task_index', str(task_index))
# use #worker=0 as chief
if job_name == "worker" and task_index == 0:
job_name = "chief"
# use #worker=1 as evaluator
if job_name == "worker" and task_index == 1:
job_name = 'evaluator'
task_index = 0
# the others as worker
if job_name == "worker" and task_index > 1:
task_index -= 2
tf_config = {
'cluster': {'chief': chief_hosts, 'worker': worker_hosts, 'ps': ps_hosts},
'task': {'type': job_name, 'index': task_index }
}
print(json.dumps(tf_config))
os.environ['TF_CONFIG'] = json.dumps(tf_config)
def main(_):
#------check Arguments------
if FLAGS.dt_dir == "":
FLAGS.dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
FLAGS.model_dir = FLAGS.model_dir + FLAGS.dt_dir
#FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir
print('task_type ', FLAGS.task_type)
print('model_dir ', FLAGS.model_dir)
print('hdfs_dir ', FLAGS.hdfs_dir)
print('local_dir ', FLAGS.local_dir)
print('dt_dir ', FLAGS.dt_dir)
print('num_epochs ', FLAGS.num_epochs)
print('feature_size ', FLAGS.feature_size)
print('field_size ', FLAGS.field_size)
print('embedding_size ', FLAGS.embedding_size)
print('batch_size ', FLAGS.batch_size)
print('deep_layers ', FLAGS.deep_layers)
print('dropout ', FLAGS.dropout)
print('loss_type ', FLAGS.loss_type)
print('optimizer ', FLAGS.optimizer)
print('learning_rate ', FLAGS.learning_rate)
print('l2_reg ', FLAGS.l2_reg)
print('ctr_task_wgt ', FLAGS.ctr_task_wgt)
#------init Envs------
# tr_files = [path+"tr/part-r-00000"]
# va_files = [path+"va/part-r-00000"]
# te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
tr_files = get_filename("/strategy/esmm/tr")
va_files = get_filename("/strategy/esmm/va")
te_files = ["%s/part-r-00000" % FLAGS.hdfs_dir]
if FLAGS.clear_existing_model:
try:
shutil.rmtree(FLAGS.model_dir)
except Exception as e:
print(e, "at clear_existing_model")
else:
print("existing model cleaned at %s" % FLAGS.model_dir)
set_dist_env()
#------bulid Tasks------
model_params = {
"field_size": FLAGS.field_size,
"feature_size": FLAGS.feature_size,
"embedding_size": FLAGS.embedding_size,
"learning_rate": FLAGS.learning_rate,
"l2_reg": FLAGS.l2_reg,
"deep_layers": FLAGS.deep_layers,
"dropout": FLAGS.dropout,
"ctr_task_wgt":FLAGS.ctr_task_wgt
}
config = tf.estimator.RunConfig().replace(session_config = tf.ConfigProto(device_count={'GPU':0, 'CPU':FLAGS.num_threads}),
log_step_count_steps=FLAGS.log_steps, save_summary_steps=FLAGS.log_steps)
Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=FLAGS.model_dir, params=model_params, config=config)
if FLAGS.task_type == 'train':
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(tr_files, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size))
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size), steps=None, start_delay_secs=1000, throttle_secs=1200)
result = tf.estimator.train_and_evaluate(Estimator, train_spec, eval_spec)
for key,value in sorted(result[0].items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'eval':
result = Estimator.evaluate(input_fn=lambda: input_fn(va_files, num_epochs=1, batch_size=FLAGS.batch_size))
for key,value in sorted(result.items()):
print('%s: %s' % (key,value))
elif FLAGS.task_type == 'infer':
preds = Estimator.predict(input_fn=lambda: input_fn(te_files, num_epochs=1, batch_size=FLAGS.batch_size), predict_keys=["pctcvr","pctr","pcvr"])
with open(FLAGS.local_dir+"/pred.txt", "w") as fo:
print("-"*100)
with open(FLAGS.local_dir + "/pred.txt", "w") as fo:
for prob in preds:
fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
def get_filename(dir_in):
pre_path = "hdfs://172.16.32.4:8020"
args = "hdfs dfs -ls " + dir_in + " | awk '{print $8}'"
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
s_output, s_err = proc.communicate()
all_dart_dirs = s_output.split()
a = []
for i in all_dart_dirs:
b = str(i).split("/")[4]
if b[:4] == "part":
tmp = pre_path + str(i)[2:-1]
a.append(tmp)
return a
if __name__ == "__main__":
# classpath = "$CLASSPATH:%JAVA_HOME%/lib/dt.jar:%JAVA_HOME%/lib/tools.jar:$(/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/opt/hadoop/share/hadoop/common/lib/hadoop-annotations-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/lib/activation-1.1.jar:/opt/hadoop/share/hadoop/common/lib/commons-codec-1.4.jar:/opt/hadoop/share/hadoop/common/lib/jasper-runtime-5.5.23.jar:/opt/hadoop/share/hadoop/common/lib/jsch-0.1.42.jar:/opt/hadoop/share/hadoop/common/lib/hadoop-auth-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/lib/jsp-api-2.1.jar:/opt/hadoop/share/hadoop/common/lib/asm-3.2.jar:/opt/hadoop/share/hadoop/common/lib/commons-lang-2.6.jar:/opt/hadoop/share/hadoop/common/lib/commons-beanutils-1.9.2.jar:/opt/hadoop/share/hadoop/common/lib/zookeeper-3.4.5-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/opt/hadoop/share/hadoop/common/lib/jetty-util-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar:/opt/hadoop/share/hadoop/common/lib/guava-11.0.2.jar:/opt/hadoop/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/opt/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:/opt/hadoop/share/hadoop/common/lib/servlet-api-2.5.jar:/opt/hadoop/share/hadoop/common/lib/commons-collections-3.2.2.jar:/opt/hadoop/share/hadoop/common/lib/jersey-server-1.9.jar:/opt/hadoop/share/hadoop/common/lib/commons-digester-1.8.jar:/opt/hadoop/share/hadoop/common/lib/jasper-compiler-5.5.23.jar:/opt/hadoop/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/opt/hadoop/share/hadoop/common/lib/curator-client-2.7.1.jar:/opt/hadoop/share/hadoop/common/lib/commons-logging-1.1.3.jar:/opt/hadoop/share/hadoop/common/lib/jackson-jaxrs-1.8.10.jar:/opt/hadoop/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/opt/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar:/opt/hadoop/share/hadoop/common/lib/gson-2.2.4.jar:/opt/hadoop/share/hadoop/common/lib/commons-configuration-1.6.jar:/opt/hadoop/share/hadoop/common/lib/commons-httpclient-3.1.jar:/opt/hadoop/share/hadoop/common/lib/hamcrest-core-1.3.jar:/opt/hadoop/share/hadoop/common/lib/httpclient-4.2.5.jar:/opt/hadoop/share/hadoop/common/lib/jets3t-0.9.0.jar:/opt/hadoop/share/hadoop/common/lib/xmlenc-0.52.jar:/opt/hadoop/share/hadoop/common/lib/logredactor-1.0.3.jar:/opt/hadoop/share/hadoop/common/lib/slf4j-api-1.7.5.jar:/opt/hadoop/share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar:/opt/hadoop/share/hadoop/common/lib/curator-recipes-2.7.1.jar:/opt/hadoop/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/opt/hadoop/share/hadoop/common/lib/jsr305-3.0.0.jar:/opt/hadoop/share/hadoop/common/lib/log4j-1.2.17.jar:/opt/hadoop/share/hadoop/common/lib/xz-1.0.jar:/opt/hadoop/share/hadoop/common/lib/junit-4.11.jar:/opt/hadoop/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/opt/hadoop/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/opt/hadoop/share/hadoop/common/lib/commons-compress-1.4.1.jar:/opt/hadoop/share/hadoop/common/lib/commons-net-3.1.jar:/opt/hadoop/share/hadoop/common/lib/jersey-json-1.9.jar:/opt/hadoop/share/hadoop/common/lib/stax-api-1.0-2.jar:/opt/hadoop/share/hadoop/common/lib/commons-el-1.0.jar:/opt/hadoop/share/hadoop/common/lib/mockito-all-1.8.5.jar:/opt/hadoop/share/hadoop/common/lib/jetty-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/common/lib/jettison-1.1.jar:/opt/hadoop/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/opt/hadoop/share/hadoop/common/lib/avro-1.7.6-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/lib/httpcore-4.2.5.jar:/opt/hadoop/share/hadoop/common/lib/commons-io-2.4.jar:/opt/hadoop/share/hadoop/common/lib/netty-3.10.5.Final.jar:/opt/hadoop/share/hadoop/common/lib/paranamer-2.3.jar:/opt/hadoop/share/hadoop/common/lib/curator-framework-2.7.1.jar:/opt/hadoop/share/hadoop/common/lib/jackson-xc-1.8.10.jar:/opt/hadoop/share/hadoop/common/lib/commons-math3-3.1.1.jar:/opt/hadoop/share/hadoop/common/lib/jersey-core-1.9.jar:/opt/hadoop/share/hadoop/common/hadoop-common-2.6.0-cdh5.16.1-tests.jar:/opt/hadoop/share/hadoop/common/hadoop-nfs-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/common/hadoop-common-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/commons-codec-1.4.jar:/opt/hadoop/share/hadoop/hdfs/lib/jasper-runtime-5.5.23.jar:/opt/hadoop/share/hadoop/hdfs/lib/jsp-api-2.1.jar:/opt/hadoop/share/hadoop/hdfs/lib/asm-3.2.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/opt/hadoop/share/hadoop/hdfs/lib/jetty-util-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/hdfs/lib/guava-11.0.2.jar:/opt/hadoop/share/hadoop/hdfs/lib/xml-apis-1.3.04.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/opt/hadoop/share/hadoop/hdfs/lib/servlet-api-2.5.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/opt/hadoop/share/hadoop/hdfs/lib/jersey-server-1.9.jar:/opt/hadoop/share/hadoop/hdfs/lib/jackson-core-asl-1.8.10.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/opt/hadoop/share/hadoop/hdfs/lib/jackson-mapper-asl-1.8.10-cloudera.1.jar:/opt/hadoop/share/hadoop/hdfs/lib/xmlenc-0.52.jar:/opt/hadoop/share/hadoop/hdfs/lib/htrace-core4-4.0.1-incubating.jar:/opt/hadoop/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/opt/hadoop/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/opt/hadoop/share/hadoop/hdfs/lib/xercesImpl-2.9.1.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-el-1.0.jar:/opt/hadoop/share/hadoop/hdfs/lib/jetty-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/opt/hadoop/share/hadoop/hdfs/lib/commons-io-2.4.jar:/opt/hadoop/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/opt/hadoop/share/hadoop/hdfs/lib/netty-3.10.5.Final.jar:/opt/hadoop/share/hadoop/hdfs/lib/jersey-core-1.9.jar:/opt/hadoop/share/hadoop/hdfs/hadoop-hdfs-nfs-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.16.1-tests.jar:/opt/hadoop/share/hadoop/yarn/lib/activation-1.1.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-codec-1.4.jar:/opt/hadoop/share/hadoop/yarn/lib/aopalliance-1.0.jar:/opt/hadoop/share/hadoop/yarn/lib/asm-3.2.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-lang-2.6.jar:/opt/hadoop/share/hadoop/yarn/lib/zookeeper-3.4.5-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/lib/guice-3.0.jar:/opt/hadoop/share/hadoop/yarn/lib/jetty-util-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/yarn/lib/guava-11.0.2.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-cli-1.2.jar:/opt/hadoop/share/hadoop/yarn/lib/servlet-api-2.5.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-collections-3.2.2.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-server-1.9.jar:/opt/hadoop/share/hadoop/yarn/lib/jackson-core-asl-1.8.10.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-logging-1.1.3.jar:/opt/hadoop/share/hadoop/yarn/lib/jackson-jaxrs-1.8.10.jar:/opt/hadoop/share/hadoop/yarn/lib/jaxb-impl-2.2.3-1.jar:/opt/hadoop/share/hadoop/yarn/lib/jline-2.11.jar:/opt/hadoop/share/hadoop/yarn/lib/jackson-mapper-asl-1.8.10-cloudera.1.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-guice-1.9.jar:/opt/hadoop/share/hadoop/yarn/lib/jsr305-3.0.0.jar:/opt/hadoop/share/hadoop/yarn/lib/log4j-1.2.17.jar:/opt/hadoop/share/hadoop/yarn/lib/xz-1.0.jar:/opt/hadoop/share/hadoop/yarn/lib/javax.inject-1.jar:/opt/hadoop/share/hadoop/yarn/lib/jaxb-api-2.2.2.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-compress-1.4.1.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-json-1.9.jar:/opt/hadoop/share/hadoop/yarn/lib/stax-api-1.0-2.jar:/opt/hadoop/share/hadoop/yarn/lib/jetty-6.1.26.cloudera.4.jar:/opt/hadoop/share/hadoop/yarn/lib/jettison-1.1.jar:/opt/hadoop/share/hadoop/yarn/lib/protobuf-java-2.5.0.jar:/opt/hadoop/share/hadoop/yarn/lib/guice-servlet-3.0.jar:/opt/hadoop/share/hadoop/yarn/lib/commons-io-2.4.jar:/opt/hadoop/share/hadoop/yarn/lib/leveldbjni-all-1.8.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-client-1.9.jar:/opt/hadoop/share/hadoop/yarn/lib/jackson-xc-1.8.10.jar:/opt/hadoop/share/hadoop/yarn/lib/jersey-core-1.9.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-client-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-common-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-tests-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-api-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-registry-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-common-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-annotations-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/aopalliance-1.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/asm-3.2.jar:/opt/hadoop/share/hadoop/mapreduce/lib/guice-3.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/snappy-java-1.0.4.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jersey-server-1.9.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jackson-core-asl-1.8.10.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jackson-mapper-asl-1.8.10-cloudera.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/hamcrest-core-1.3.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jersey-guice-1.9.jar:/opt/hadoop/share/hadoop/mapreduce/lib/log4j-1.2.17.jar:/opt/hadoop/share/hadoop/mapreduce/lib/xz-1.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/junit-4.11.jar:/opt/hadoop/share/hadoop/mapreduce/lib/javax.inject-1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/commons-compress-1.4.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/protobuf-java-2.5.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/avro-1.7.6-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/lib/guice-servlet-3.0.jar:/opt/hadoop/share/hadoop/mapreduce/lib/commons-io-2.4.jar:/opt/hadoop/share/hadoop/mapreduce/lib/leveldbjni-all-1.8.jar:/opt/hadoop/share/hadoop/mapreduce/lib/netty-3.10.5.Final.jar:/opt/hadoop/share/hadoop/mapreduce/lib/paranamer-2.3.jar:/opt/hadoop/share/hadoop/mapreduce/lib/jersey-core-1.9.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-hs-plugins-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-app-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-shuffle-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-nativetask-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.1-tests.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-hs-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0-cdh5.16.1.jar:/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.1.jar)"
# w
a = "export CLASSPATH='$(hadoop classpath --glob)'"
os.system(a)
path = "hdfs://172.16.32.4:8020/strategy/esmm/"
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment