Commit 9fbcf1a0 authored by 张彦钊's avatar 张彦钊

删除视频特征

parent 9bce9ba0
...@@ -159,7 +159,7 @@ def feature_engineer(): ...@@ -159,7 +159,7 @@ def feature_engineer():
sql = "select distinct recover_time from knowledge" sql = "select distinct recover_time from knowledge"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
unique_values.append("video") # unique_values.append("video")
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data_dwell" sql = "select max(stat_date) from esmm_train_data_dwell"
...@@ -179,7 +179,7 @@ def feature_engineer(): ...@@ -179,7 +179,7 @@ def feature_engineer():
"channel", "top", "time", "stat_date", "hospital_id", "channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time", "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
"app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", "app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"search_tag2", "search_tag3","is_video"] "search_tag2", "search_tag3"]
unique_values.extend(features) unique_values.extend(features)
print("unique_values length") print("unique_values length")
print(len(unique_values)) print(len(unique_values))
...@@ -195,7 +195,7 @@ def feature_engineer(): ...@@ -195,7 +195,7 @@ def feature_engineer():
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \ "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \ "ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \ "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \
"e.device_id,e.cid_id,video.is_video " \ "e.device_id,e.cid_id " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \ "from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \ "left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \ "left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
...@@ -212,14 +212,13 @@ def feature_engineer(): ...@@ -212,14 +212,13 @@ def feature_engineer():
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \ "left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \ "left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \ "left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
"left join jerry_prod.diary_video video on e.cid_id = video.cid and e.stat_date = video.stat_date " \
"where e.stat_date >= '{}'".format(start) "where e.stat_date >= '{}'".format(start)
df = spark.sql(sql) df = spark.sql(sql)
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer", df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids", "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7","is_video"]) "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"])
df = df.na.fill(dict(zip(features, features))) df = df.na.fill(dict(zip(features, features)))
...@@ -227,7 +226,7 @@ def feature_engineer(): ...@@ -227,7 +226,7 @@ def feature_engineer():
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time", "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time", "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time", "search_tag2", "search_tag3","is_video","cid_id","device_id")\ "maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\
.rdd.repartition(200).map( .rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map), lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map), app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
...@@ -237,8 +236,8 @@ def feature_engineer(): ...@@ -237,8 +236,8 @@ def feature_engineer():
value_map.get(x[16], 5), value_map.get(x[17], 6), value_map.get(x[18], 7), value_map.get(x[19], 8), value_map.get(x[16], 5), value_map.get(x[17], 6), value_map.get(x[18], 7), value_map.get(x[19], 8),
value_map.get(x[20], 9), value_map.get(x[21], 10), value_map.get(x[20], 9), value_map.get(x[21], 10),
value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13), value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13),
value_map.get(x[25], 14), value_map.get(x[26], 15),value_map.get(x[29], 16)], value_map.get(x[25], 14), value_map.get(x[26], 15)],
app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[30],x[31] app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30]
)) ))
...@@ -286,7 +285,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -286,7 +285,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \ "u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids," \ "dl.app_list,e.hospital_id,feat.level3_ids," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \ "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3,video.is_video," \ "ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \ "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from jerry_test.esmm_pre_data e " \ "from jerry_test.esmm_pre_data e " \
"left join jerry_test.user_feature u on e.device_id = u.device_id " \ "left join jerry_test.user_feature u on e.device_id = u.device_id " \
...@@ -302,14 +301,13 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -302,14 +301,13 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \ "left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \ "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id " \ "left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \ "left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date"
"left join jerry_prod.diary_video video on e.cid_id = video.cid and e.stat_date = video.stat_date"
features = ["ucity_id", "ccity_name", "device_type", "manufacturer", features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id", "channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time", "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
"app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", "app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"search_tag2", "search_tag3","is_video"] "search_tag2", "search_tag3"]
df = spark.sql(sql) df = spark.sql(sql)
df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"]) df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"])
...@@ -320,7 +318,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -320,7 +318,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time", "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time", "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time", "search_tag2", "search_tag3","is_video") \ "maintain_time", "recover_time", "search_tag2", "search_tag3") \
.rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5], .rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map), app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map), app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
...@@ -334,7 +332,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -334,7 +332,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
value_map.get(x[23], 9), value_map.get(x[24], 10), value_map.get(x[23], 9), value_map.get(x[24], 10),
value_map.get(x[25], 11), value_map.get(x[26], 12), value_map.get(x[25], 11), value_map.get(x[26], 12),
value_map.get(x[27], 13), value_map.get(x[28], 14), value_map.get(x[27], 13), value_map.get(x[28], 14),
value_map.get(x[29], 15),value_map.get(x[32], 16)], value_map.get(x[29], 15)],
app_list_func(x[30], leve2_map),app_list_func(x[31], leve3_map))) app_list_func(x[30], leve2_map),app_list_func(x[31], leve3_map)))
...@@ -374,7 +372,6 @@ if __name__ == '__main__': ...@@ -374,7 +372,6 @@ if __name__ == '__main__':
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark) ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test") ti.tidbMapDatabase("jerry_test")
ti.tidbMapDatabase("jerry_prod")
ti.tidbMapDatabase("eagle") ti.tidbMapDatabase("eagle")
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
path = "hdfs:///strategy/esmm/" path = "hdfs:///strategy/esmm/"
......
...@@ -19,11 +19,11 @@ echo "rm model file" ...@@ -19,11 +19,11 @@ echo "rm model file"
b=`date +%Y%m%d` b=`date +%Y%m%d`
echo "train..." echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=16 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH} --hdfs_dir=${HDFS_PATH}/native --task_type=train ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH} --hdfs_dir=${HDFS_PATH}/native --task_type=train
echo "infer native..." echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=16 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/native --hdfs_dir=${HDFS_PATH}/native --task_type=infer
echo "infer nearby..." echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=16 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.5 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.3,0.3,0.3,0.3,0.3 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=10000 --field_size=15 --feature_size=600000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${HDFS_PATH}/model_ckpt/DeepCvrMTL/ --local_dir=${LOCAL_PATH}/nearby --hdfs_dir=${HDFS_PATH}/nearby --task_type=infer
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment