Commit f8359d52 authored by 张彦钊's avatar 张彦钊

change test fliw

parent a8f36280
......@@ -139,7 +139,7 @@ def feature_engineer():
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=6)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=100)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
......@@ -204,7 +204,7 @@ def feature_engineer():
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))
rdd.persist(storageLevel= StorageLevel.MEMORY_AND_DISK)
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
......@@ -267,8 +267,6 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
df = spark.sql(sql)
df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"])
print("pre test")
print(df.count())
df = df.na.fill(dict(zip(features, features)))
f = time.time()
rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
......@@ -292,21 +290,16 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
value_map.get(x[29], 15)
]))
rdd.persist(storageLevel= StorageLevel.MEMORY_AND_DISK)
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id")
print("native csv")
native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
# TODO 写成csv文件改成下面这样
# native_pre.coalesce(1).write.format('com.databricks.spark.csv').save(path+"native/",header = 'true')
# 预测的tfrecord必须写成一个文件,这样可以摆保证顺序
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16]))) \
.toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids").write.format("tfrecords") \
"tag5_list", "tag6_list", "tag7_list", "ids").repartition(1).write.format("tfrecords") \
.save(path=path+"native/", mode="overwrite")
print("native tfrecord done")
h = time.time()
......@@ -316,14 +309,13 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
.toDF("city", "uid", "cid_id")
print("nearby csv")
native_pre.toPandas().to_csv(local_path + "nearby.csv", header=True)
# TODO 写成csv文件改成下面这样
# nearby_pre.coalesce(1).write.format('com.databricks.spark.csv').save(path+"nearby/",header = 'true')
spark.createDataFrame(rdd.filter(lambda x: x[0] == 1)
.map(
lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids").write.format("tfrecords") \
"tag5_list", "tag6_list", "tag7_list", "ids").repartition(1).write.format("tfrecords") \
.save(path=path + "nearby/", mode="overwrite")
print("nearby tfrecord done")
......
......@@ -19,11 +19,11 @@ echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH} --hdfs_dir=${HDFS_PATH}/native --task_type=train > "$LOCAL_PATH/log/$b_train.log"
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer > "$LOCAL_PATH/log/$b_native.log"
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=8000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer > "$LOCAL_PATH/log/$b_native.log"
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=2000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer > "$LOCAL_PATH/log/$b_nearby.log"
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=8000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${LOCAL_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer > "$LOCAL_PATH/log/$b_nearby.log"
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py > "$LOCAL_PATH/log/$b_insert.log"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment