Commit 98b5d9b2 authored by 王志伟's avatar 王志伟
parents e6cc92de 7e85adec
...@@ -396,7 +396,7 @@ object EsmmPredData { ...@@ -396,7 +396,7 @@ object EsmmPredData {
|where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}') |where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}')
""".stripMargin """.stripMargin
) )
raw_data.show() // raw_data.show()
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map { val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map {
...@@ -406,7 +406,7 @@ object EsmmPredData { ...@@ -406,7 +406,7 @@ object EsmmPredData {
val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("") val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids") (device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue") }.filter(_._3!="").toDF("device_id","city_id","merge_queue")
println("nearby_device_count",raw_data1.count()) // println("nearby_device_count",raw_data1.count())
val start= LocalDate.now().minusDays(14).toString val start= LocalDate.now().minusDays(14).toString
import sc.implicits._ import sc.implicits._
...@@ -443,7 +443,7 @@ object EsmmPredData { ...@@ -443,7 +443,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
).withColumn("label",lit(1)) ).withColumn("label",lit(1))
raw_data2.createOrReplaceTempView("raw_data2") raw_data2.createOrReplaceTempView("raw_data2")
println("nearby_explode_count",raw_data2.count()) // println("nearby_explode_count",raw_data2.count())
// native_data // native_data
...@@ -455,7 +455,7 @@ object EsmmPredData { ...@@ -455,7 +455,7 @@ object EsmmPredData {
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != "" |where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin """.stripMargin
) )
println("native_device_count",native_data.count()) // println("native_device_count",native_data.count())
if (history.take(1).nonEmpty){ if (history.take(1).nonEmpty){
native_data.createOrReplaceTempView("temp") native_data.createOrReplaceTempView("temp")
...@@ -479,9 +479,7 @@ object EsmmPredData { ...@@ -479,9 +479,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
).withColumn("label",lit(0)) ).withColumn("label",lit(0))
native_data1.createOrReplaceTempView("native_data1") native_data1.createOrReplaceTempView("native_data1")
println("native_explode_count",native_data1.count()) // println("native_explode_count",native_data1.count())
//union //union
val union_data = sc.sql( val union_data = sc.sql(
...@@ -492,7 +490,7 @@ object EsmmPredData { ...@@ -492,7 +490,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
) )
union_data.createOrReplaceTempView("raw_data") union_data.createOrReplaceTempView("raw_data")
println("union_count",union_data.count()) // println("union_count",union_data.count())
//join feat //join feat
...@@ -508,7 +506,7 @@ object EsmmPredData { ...@@ -508,7 +506,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
) )
// sid_data.show() // sid_data.show()
println(sid_data.count()) // println(sid_data.count())
val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0)) val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
sid_data_label.createOrReplaceTempView("union_data") sid_data_label.createOrReplaceTempView("union_data")
...@@ -556,10 +554,29 @@ object EsmmPredData { ...@@ -556,10 +554,29 @@ object EsmmPredData {
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name") union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show() // union_data_ccity_name.show()
val jdbcDF = sc.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com:3306/zhengxing")
.option("dbtable", "api_punishment")
.option("user", "work")
.option("password", "BJQaT9VzDcuPBqkd")
.load()
jdbcDF.createOrReplaceTempView("api_punishment")
val now = LocalDate.now().toString
val punish_doctor = sc.sql(
s"""
|select doctor_id from api_punishment
|where end_time > '$now'
""".stripMargin).collect().map(x => x(0).toString).distinct
println("punish_doctor")
println(punish_doctor.length)
val union_data_scity_id = sc.sql( val union_data_scity_id = sc.sql(
s""" s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id | d.city_id as scity_id,b.doctor_id,c.hospital_id
|from union_data_ccity_name a |from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
...@@ -567,20 +584,22 @@ object EsmmPredData { ...@@ -567,20 +584,22 @@ object EsmmPredData {
|where b.partition_date='${yesteday}' |where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}' |and c.partition_date='${yesteday}'
|and d.partition_date='${yesteday}' |and d.partition_date='${yesteday}'
|and b.doctor_id not in (${punish_doctor.map(x => s"'$x'").mkString(",")})
""".stripMargin """.stripMargin
) )
union_data_scity_id.createOrReplaceTempView("union_data_scity_id") union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
val union_data_scity_id2 = sc.sql( val union_data_scity_id2 = sc.sql(
s""" s"""
|select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,label,first(diary_service_id)diary_service_id,first(y) y, |select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,label,first(diary_service_id)diary_service_id,first(y) y,
|first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id |first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,
|first(scity_id) scity_id,first(hospital_id) hospital_id
|from union_data_scity_id |from union_data_scity_id
|group by device_id,cid_id,label |group by device_id,cid_id,label
""".stripMargin """.stripMargin
) )
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
// println(union_data_scity_id2.count()) // println(union_data_scity_id2.count())
union_data_scity_id2.persist() union_data_scity_id2.persist()
......
...@@ -37,11 +37,11 @@ def get_data(): ...@@ -37,11 +37,11 @@ def get_data():
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=300)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
print(start) print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,cut.time,dl.app_list " \ "u.channel,c.top,e.device_id,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids " \
"from {} e left join user_feature u on e.device_id = u.device_id " \ "from {} e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \ "left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \ "left join cid_time_cut cut on e.cid_id = cut.cid " \
...@@ -52,31 +52,31 @@ def get_data(): ...@@ -52,31 +52,31 @@ def get_data():
# print(df.shape) # print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id",
11: "time",12:"app_list"}) 11: "time",12:"app_list",13:"service_id",14:"level3_ids"})
print("esmm data ok") print("esmm data ok")
print(df.shape)
# print(df.head(2) # print(df.head(2)
service_id = tuple(df["service_id"].unique())
db = pymysql.connect(host='rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com', port=3306, user='work', passwd='BJQaT9VzDcuPBqkd', db='zhengxing')
sql = "select s.id,d.hospital_id from api_service s left join api_doctor d on s.doctor_id = d.id where s.id in {}".format(service_id)
hospital = con_sql(db, sql)
hospital = hospital.rename(columns={0: "service_id", 1: "hospital_id"})
df = pd.merge(df, hospital, on='service_id', how='left')
print("before") print("before")
print(df.shape) print(df.shape)
print("after") print("after")
df = df.drop_duplicates() df = df.drop_duplicates()
df = df.drop_duplicates(["ucity_id", "clevel2_id", "ccity_name", "device_type", "manufacturer", df = df.drop_duplicates(["ucity_id", "clevel2_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date","app_list"]) "channel", "top", "time", "stat_date","app_list","hospital_id","level3_ids"])
app_list_number,app_list_map = multi_hot(df,"app_list",1) app_list_number,app_list_map = multi_hot(df,"app_list",1)
level2_number,level2_map = multi_hot(df,"clevel2_id",1+app_list_number) level2_number,level2_map = multi_hot(df,"clevel2_id",1+app_list_number)
# df["app_list"] = df["app_list"].fillna("lost_na") level3_number,level3_ids = multi_hot(df, "hospital_id", 1 + app_list_number + level2_number)
# app_list_value = [i.split(",") for i in df["app_list"].unique()]
# app_list_unique = []
# for i in app_list_value:
# app_list_unique.extend(i)
# app_list_unique = list(set(app_list_unique))
# app_list_map = dict(zip(app_list_unique, list(range(1, len(app_list_unique) + 1))))
# df["app_list"] = df["app_list"].apply(app_list_func,args=(app_list_map,))
unique_values = [] unique_values = []
features = ["ucity_id", "ccity_name", "device_type", "manufacturer", features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"] "channel", "top", "time", "stat_date","hospital_id"]
for i in features: for i in features:
df[i] = df[i].astype("str") df[i] = df[i].astype("str")
df[i] = df[i].fillna("lost") df[i] = df[i].fillna("lost")
...@@ -84,14 +84,14 @@ def get_data(): ...@@ -84,14 +84,14 @@ def get_data():
df[i] = df[i] + i df[i] = df[i] + i
unique_values.extend(list(df[i].unique())) unique_values.extend(list(df[i].unique()))
temp = list(range(1+app_list_number+level2_number, 1 + app_list_number+level2_number + len(unique_values))) temp = list(range(1+app_list_number+level2_number + level3_number, 1 + app_list_number+level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values,temp)) value_map = dict(zip(unique_values,temp))
df = df.drop("device_id", axis=1) df = df.drop("device_id", axis=1)
train = df[df["stat_date"] != validate_date+"stat_date"] train = df[df["stat_date"] != validate_date+"stat_date"]
test = df[df["stat_date"] == validate_date+"stat_date"] test = df[df["stat_date"] == validate_date+"stat_date"]
for i in ["ucity_id", "ccity_name", "device_type", "manufacturer", for i in ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"]: "channel", "top", "time", "stat_date","hospital_id"]:
train[i] = train[i].map(value_map) train[i] = train[i].map(value_map)
test[i] = test[i].map(value_map) test[i] = test[i].map(value_map)
...@@ -193,5 +193,5 @@ if __name__ == '__main__': ...@@ -193,5 +193,5 @@ if __name__ == '__main__':
train_data_set = "esmm_train_data" train_data_set = "esmm_train_data"
path = "/data/esmm/" path = "/data/esmm/"
date,value,app_list,level2 = get_data() date,value,app_list,level2 = get_data()
get_predict(date, value,app_list,level2) # get_predict(date, value,app_list,level2)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment