Commit 0c7878dc authored by 张彦钊's avatar 张彦钊

特征工程增加分享数据

parent 1786cc9c
...@@ -104,15 +104,15 @@ def feature_engineer(): ...@@ -104,15 +104,15 @@ def feature_engineer():
unique_values = [] unique_values = []
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct stat_date from esmm_train_data_dwell" sql = "select distinct stat_date from esmm_train_data_share"
unique_values.extend(get_unique(db,sql)) unique_values.extend(get_unique(db,sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ucity_id from esmm_train_data_dwell" sql = "select distinct ucity_id from esmm_train_data_share"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ccity_name from esmm_train_data_dwell" sql = "select distinct ccity_name from esmm_train_data_share"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
...@@ -162,7 +162,7 @@ def feature_engineer(): ...@@ -162,7 +162,7 @@ def feature_engineer():
# unique_values.append("video") # unique_values.append("video")
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data_dwell" sql = "select max(stat_date) from esmm_train_data_share"
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
...@@ -170,7 +170,7 @@ def feature_engineer(): ...@@ -170,7 +170,7 @@ def feature_engineer():
print(start) print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \ sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_share e " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \ "left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \ "left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"where e.stat_date >= '{}'".format(start) "where e.stat_date >= '{}'".format(start)
...@@ -190,13 +190,13 @@ def feature_engineer(): ...@@ -190,13 +190,13 @@ def feature_engineer():
29 + apps_number + level2_number + level3_number + len(unique_values))) 29 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp)) value_map = dict(zip(unique_values, temp))
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ sql = "select e.y,e.z,e.s,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \ "u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \ "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \ "ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \ "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \
"e.device_id,e.cid_id " \ "e.device_id,e.cid_id " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \ "from jerry_test.esmm_train_data_share e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \ "left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \ "left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \ "left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
...@@ -226,9 +226,9 @@ def feature_engineer(): ...@@ -226,9 +226,9 @@ def feature_engineer():
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time", "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time", "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\ "maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id","s")\
.rdd.repartition(200).map( .rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map), lambda x: (x[0], float(x[1]), float(x[2]), float(x[31]),app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map), app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map), app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map), app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
...@@ -245,32 +245,29 @@ def feature_engineer(): ...@@ -245,32 +245,29 @@ def feature_engineer():
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集 # TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
train = rdd.map( train = rdd.filter(lambda x: x[0] != validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18])) x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18],x[19]))
f = time.time() f = time.time()
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list", spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", "tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \ "search_tag2_list","search_tag3_list","city","cid_id","uid","s") \
.repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite") .repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
h = time.time() h = time.time()
print("train tfrecord done") print("train tfrecord done")
print((h - f) / 60) print((h - f) / 60)
print("训练集样本总量:")
print(rdd.count())
get_pre_number() get_pre_number()
test = rdd.filter(lambda x: x[0] == validate_date).map( test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18])) x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18],x[19]))
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list", spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", "tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \ "search_tag2_list","search_tag3_list","city","cid_id","uid","s") \
.repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite") .repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
print("va tfrecord done") print("va tfrecord done")
...@@ -386,7 +383,7 @@ if __name__ == '__main__': ...@@ -386,7 +383,7 @@ if __name__ == '__main__':
local_path = "/home/gmuser/esmm/" local_path = "/home/gmuser/esmm/"
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer() validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map) # get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
spark.stop() spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment