Commit a012f246 authored by 王志伟's avatar 王志伟
parents d6411015 46f27a5c
...@@ -46,4 +46,3 @@ ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 ...@@ -46,4 +46,3 @@ ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001
echo "sort and 2sql" echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py ${PYTHON_PATH} ${MODEL_PATH}/to_database.py
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/send_mail.py
...@@ -799,6 +799,229 @@ object GetDeviceDuration { ...@@ -799,6 +799,229 @@ object GetDeviceDuration {
uid_duration_last.show() uid_duration_last.show()
sc.stop()
}
}
}
object EsmmDataTest {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = GmeiConfig.getMinusNDate(1)
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("EsmmData")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String]("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_test",tableName = "diary_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure_precise")
ti.tidbMapTable(dbName = "jerry_test", tableName = "train_data")
val max_stat_date = sc.sql(
s"""
|select max(stat_date) from train_data
""".stripMargin
)
val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
println("max_stat_date_str",max_stat_date_str)
println("param.date",param.date)
if (max_stat_date_str != param.date || max_stat_date_str == null){
val stat_date = param.date
println(stat_date)
// val imp_data = sc.sql(
// s"""
// |select distinct stat_date,device_id,city_id as ucity_id,
// | cid_id,diary_service_id
// |from data_feed_exposure
// |where cid_type = 'diary'
// |and stat_date ='${stat_date}'
// """.stripMargin
// )
val imp_data = sc.sql(
s"""
|select * from
|(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id
|from data_feed_exposure_precise
|where cid_type = 'diary'
|and stat_date ='${stat_date}'
|group by stat_date,device_id,city_id,cid_id,diary_service_id) a
""".stripMargin
)
// imp_data.show()
// println("imp_data.count()")
// println(imp_data.count())
val clk_data = sc.sql(
s"""
|select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id
|from diary_click
|where cid_type = 'diary'
|and stat_date ='${stat_date}'
""".stripMargin
)
// clk_data.show()
// println("clk_data.count()")
// println(clk_data.count())
val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
// imp_data_filter.createOrReplaceTempView("imp_data_filter")
// imp_data_filter.show()
// println("imp_data_filter.count()")
// println(imp_data_filter.count())
val stat_date_not = stat_date.replace("-","")
val cvr_data = sc.sql(
s"""
|select distinct
| from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
| cl_id as device_id,city_id as ucity_id,
| params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
|from online.tl_hdfs_maidian_view
|where action='page_view'
|and partition_date ='${stat_date_not}'
|and params['page_name'] = 'welfare_detail'
|and params['referrer'] = 'diary_detail'
""".stripMargin
)
val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
// cvr_data_filter.show()
// println("cvr_data_filter.count()")
// println(cvr_data_filter.count())
val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
// clk_data_filter.createOrReplaceTempView("clk_data_filter")
// clk_data_filter.show()
// println("clk_data_filter.count()")
// println(clk_data_filter.count())
val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
union_data.createOrReplaceTempView("union_data")
// union_data.show()
// println("union_data.count()")
// println(union_data.count())
val union_data_clabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
| c.level1_id as clevel1_id
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
""".stripMargin
)
union_data_clabel.createOrReplaceTempView("union_data_clabel")
// union_data_clabel.show()
val union_data_slabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
| c.level1_id as slevel1_id
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
""".stripMargin
)
union_data_slabel.createOrReplaceTempView("union_data_slabel")
// union_data_slabel.show()
val union_data_ccity_name = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name
|from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
|left join src_zhengxing_api_tag c on b.tag_id=c.id
| where c.tag_type=4
""".stripMargin
)
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show()
val union_data_scity_id = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id
|from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
|left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
|and d.partition_date='${stat_date_not}'
""".stripMargin
)
union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
val union_data_scity_id2 = sc.sql(
s"""
|select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,first(diary_service_id) diary_service_id,first(y) y,
|first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id
|from union_data_scity_id
|group by device_id,cid_id
""".stripMargin
)
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="train_data",SaveMode.Append)
} else {
println("train_data already have param.date data")
}
sc.stop() sc.stop()
} }
......
...@@ -109,7 +109,7 @@ def get_predict(date,value_map): ...@@ -109,7 +109,7 @@ def get_predict(date,value_map):
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \ "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \ "left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \ "left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid where device_id = '358035085192742'" "left join cid_time_cut cut on e.cid_id = cut.cid"
df = con_sql(db, sql) df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11:"l2", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11:"l2",
...@@ -163,7 +163,8 @@ def get_predict(date,value_map): ...@@ -163,7 +163,8 @@ def get_predict(date,value_map):
if __name__ == '__main__': if __name__ == '__main__':
path = "/home/gmuser/esmm_data/" # path = "/home/gmuser/esmm_data/"
path = "/data/esmm/"
date,value = get_data() date,value = get_data()
get_predict(date, value) get_predict(date, value)
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/tensnsorflow/es MODEL_PATH=/srv/apps/ffm-baseline/tensnsorflow/es
DATA_PATH=/home/gmuser/esmm_data DATA_PATH=/data/esmm
echo "rm leave tfrecord" echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/* rm ${DATA_PATH}/tr/*
......
...@@ -37,10 +37,10 @@ def native_set_join(lst): ...@@ -37,10 +37,10 @@ def native_set_join(lst):
def main(): def main():
# native queue # native queue
df2 = pd.read_csv('/home/gmuser/esmm_data/native.csv') df2 = pd.read_csv('/data/esmm/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str) df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"]) df1 = pd.read_csv("/data/esmm/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"] df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False) df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"] df3.columns = ["device_id","city_id","native_queue"]
...@@ -48,10 +48,10 @@ def main(): ...@@ -48,10 +48,10 @@ def main():
# nearby queue # nearby queue
df2 = pd.read_csv('/home/gmuser/esmm_data/nearby.csv') df2 = pd.read_csv('/data/esmm/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str) df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gmuser/esmm_data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"]) df1 = pd.read_csv("/data/esmm/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"] df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False) df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"] df4.columns = ["device_id","city_id","nearby_queue"]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment