Commit 1443f034 authored by 王志伟's avatar 王志伟

跑之前的数据

parents de49f95a 015cd69f
...@@ -155,7 +155,7 @@ def feature_engineer(): ...@@ -155,7 +155,7 @@ def feature_engineer():
print(start) print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
sql = "select doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \ sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \ "left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \ "left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"where e.stat_date >= '{}'".format(start) "where e.stat_date >= '{}'".format(start)
...@@ -212,9 +212,11 @@ def feature_engineer(): ...@@ -212,9 +212,11 @@ def feature_engineer():
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map), app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map), app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map), app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]], [value_map.get(x[0],1), value_map.get(x[13],2), value_map.get(x[14],3), value_map.get(x[15],4),
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]], value_map.get(x[16],5),value_map.get(x[17],6), value_map.get(x[18],7), value_map.get(x[19],8),
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]])) value_map.get(x[20],9), value_map.get(x[21],10),
value_map.get(x[22],11), value_map.get(x[23],12), value_map.get(x[24],13),
value_map.get(x[25],14), value_map.get(x[26],15)]))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER) rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
......
...@@ -151,7 +151,7 @@ def feature_engineer(): ...@@ -151,7 +151,7 @@ def feature_engineer():
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=3)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=10)).strftime("%Y-%m-%d")
print(start) print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
...@@ -173,82 +173,86 @@ def feature_engineer(): ...@@ -173,82 +173,86 @@ def feature_engineer():
16 + apps_number + level2_number + level3_number + len(unique_values))) 16 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp)) value_map = dict(zip(unique_values, temp))
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ # sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \ # "u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \ # "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7," \ # "ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \ # "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \ # "from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \ # "left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \ # "left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \ # "left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \ # "left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id " \ # "left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \ # "left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
"left join jerry_test.question_tag question on e.device_id = question.device_id " \ # "left join jerry_test.question_tag question on e.device_id = question.device_id " \
"left join jerry_test.search_tag search on e.device_id = search.device_id " \ # "left join jerry_test.search_tag search on e.device_id = search.device_id " \
"left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \ # "left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \ # "left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \ # "left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \ # "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \ # "left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \ # "left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"where e.stat_date >= '{}'".format(start) # "where e.stat_date >= '{}'".format(start)
#
df = spark.sql(sql) # df = spark.sql(sql)
#
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer", # df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids", # "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"]) # "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"])
#
df = df.na.fill(dict(zip(features, features))) # df = df.na.fill(dict(zip(features, features)))
#
rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids", # rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", # "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time", # "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time", # "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time").rdd.repartition(200).map( # "maintain_time", "recover_time").rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map), # lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map), # app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map), # app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map), # app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]], # [value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]], # value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]])) # value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))\
# .zipWithIndex().map(lambda x:(x[0][0],x[0][1],x[0][2],x[0][3],x[0][4],x[0][5],x[0][6],x[0][7],x[0][8],
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER) # x[0][9],x[0][10],x[0][11],x[0][12],x[0][13],
# x[1]))
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集 #
#
train = rdd.map( # rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], #
x[10], x[11], x[12], x[13])) # # TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
f = time.time() #
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list", # train = rdd.map(
"tag1_list", "tag2_list", "tag3_list", "tag4_list", # lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
"tag5_list", "tag6_list", "tag7_list", "ids") \ # x[10], x[11], x[12], x[13],x[14]))
.repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite") # f = time.time()
h = time.time() # spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
print("train tfrecord done") # "tag1_list", "tag2_list", "tag3_list", "tag4_list",
print((h - f) / 60) # "tag5_list", "tag6_list", "tag7_list", "ids","number") \
# .repartition(1).write.format("tfrecords").save(path=path + "test_tr/", mode="overwrite")
print("训练集样本总量:") # h = time.time()
print(rdd.count()) # print("train tfrecord done")
# print((h - f) / 60)
get_pre_number() #
# print("训练集样本总量:")
test = rdd.filter(lambda x: x[0] == validate_date).map( # print(rdd.count())
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], #
x[10], x[11], x[12], x[13])) # get_pre_number()
#
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list", # test = rdd.filter(lambda x: x[0] == validate_date).map(
"tag1_list", "tag2_list", "tag3_list", "tag4_list", # lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
"tag5_list", "tag6_list", "tag7_list", "ids") \ # x[10], x[11], x[12], x[13],x[14]))
.repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite") #
# spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
print("va tfrecord done") # "tag1_list", "tag2_list", "tag3_list", "tag4_list",
# "tag5_list", "tag6_list", "tag7_list", "ids","number") \
rdd.unpersist() # .repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
#
# print("va tfrecord done")
#
# rdd.unpersist()
return validate_date, value_map, app_list_map, leve2_map, leve3_map return validate_date, value_map, app_list_map, leve2_map, leve3_map
...@@ -273,7 +277,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -273,7 +277,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \ "left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \ "left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \ "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id limit 1000" "left join jerry_test.knowledge k on feat.level2 = k.level2_id"
features = ["ucity_id", "ccity_name", "device_type", "manufacturer", features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id", "channel", "top", "time", "hospital_id",
...@@ -302,34 +306,39 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -302,34 +306,39 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
value_map.get(x[25], 11), value_map.get(x[26], 12), value_map.get(x[25], 11), value_map.get(x[26], 12),
value_map.get(x[27], 13), value_map.get(x[28], 14), value_map.get(x[27], 13), value_map.get(x[28], 14),
value_map.get(x[29], 15) value_map.get(x[29], 15)
])) ]))\
.zipWithIndex().map(lambda x:(x[0][0],x[0][1],x[0][2],x[0][3],x[0][4],x[0][5],x[0][6],x[0][7],x[0][8],
x[0][9],x[0][10],x[0][11],x[0][12],x[0][13],x[0][14],x[0][15],x[0][16],
x[1]))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER) rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\ # native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5],x[17])))\
.toDF("city","uid","cid_id") # .toDF("city","uid","cid_id","number")
print("native csv") # print("native csv")
native_pre.repartition(1).write.format('com.databricks.spark.csv').save(path + "native/", header='true') # native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0) spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16]))) \ .map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],
x[12],x[13],x[14],x[15],x[16],x[17],x[3],x[4],x[5]))) \
.toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list", .toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids").repartition(1).write.format("tfrecords") \ "tag5_list", "tag6_list", "tag7_list", "ids","number","city","uid","cid_id")\
.save(path=path+"native/", mode="overwrite") .repartition(100).write.format("tfrecords").save(path=path+"test_native/", mode="overwrite")
print("native tfrecord done") print("native tfrecord done")
h = time.time() h = time.time()
print((h-f)/60) print((h-f)/60)
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[0] == 1).map(lambda x: (x[3], x[4], x[5]))) \ # nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[0] == 1).map(lambda x: (x[3], x[4], x[5],x[17]))) \
.toDF("city", "uid", "cid_id") # .toDF("city", "uid", "cid_id","number")
print("nearby csv") # print("nearby csv")
nearby_pre.repartition(1).write.format('com.databricks.spark.csv').save(path + "nearby/", header='true') # nearby_pre.toPandas().to_csv(local_path + "nearby.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 1) spark.createDataFrame(rdd.filter(lambda x: x[0] == 1)
.map( .map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16]))) \ x[12],x[13], x[14], x[15], x[16],x[17],x[3],x[4],x[5]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list", .toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids").repartition(1).write.format("tfrecords") \ "tag5_list", "tag6_list", "tag7_list", "ids","number","city","uid","cid_id")\
.save(path=path + "nearby/", mode="overwrite") .repartition(100).write.format("tfrecords").save(path=path + "test_nearby/", mode="overwrite")
print("nearby tfrecord done") print("nearby tfrecord done")
...@@ -350,34 +359,8 @@ if __name__ == '__main__': ...@@ -350,34 +359,8 @@ if __name__ == '__main__':
path = "hdfs:///strategy/esmm/" path = "hdfs:///strategy/esmm/"
local_path = "/home/gmuser/esmm/" local_path = "/home/gmuser/esmm/"
# validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer() validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
# get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map) get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
"left join jerry_test.question_tag question on e.device_id = question.device_id " \
"left join jerry_test.search_tag search on e.device_id = search.device_id " \
"left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"where e.stat_date >= '2019-06-10'"
df = spark.sql(sql)
df.repartition(1).write.format('com.databricks.spark.csv').save(path + "native/a.csv", header='true')
spark.stop() spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment