Commit bbe711f9 authored by 张彦钊's avatar 张彦钊

change test file

parent cd12f281
......@@ -75,7 +75,7 @@ def con_sql(db,sql):
return df
def feature_engineer():
apps_number, app_list_map, level2_number, level2_map, level3_number, level3_map = get_map()
apps_number, app_list_map, level2_number, leve2_map, level3_number, leve3_map = get_map()
unique_values = []
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct stat_date from esmm_train_data_dur"
......@@ -185,56 +185,56 @@ def feature_engineer():
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids",
"tag1","tag2","tag3","tag4","tag5","tag6","tag7"])
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"])
df = df.na.fill(dict(zip(features, features)))
rdd = df.select("stat_date","y", "z","app_list","level2_ids","level3_ids",
"tag1","tag2","tag3","tag4","tag5","tag6","tag7",
"ucity_id", "ccity_name","device_type", "manufacturer", "channel", "top", "time",
"hospital_id","treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time","recover_time").rdd.repartition(200)\
.map(lambda x: (x[0],float(x[1]),float(x[2]),
app_list_func(x[3], app_list_map), app_list_func(x[4], level2_map),
app_list_func(x[5], level3_map), app_list_func(x[6], level2_map),
app_list_func(x[7], level2_map),app_list_func(x[8], level2_map),
app_list_func(x[9], level2_map),app_list_func(x[10], level2_map),
app_list_func(x[11],level2_map),app_list_func(x[12], level2_map),
[value_map[x[0]], value_map[x[13]],value_map[x[14]], value_map[x[15]],
value_map[x[16]],value_map[x[17]],value_map[x[18]], value_map[x[19]],
value_map[x[20]],value_map[x[21]],value_map[x[22]], value_map[x[23]],
value_map[x[24]],value_map[x[25]],value_map[x[26]]]))
rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time").rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))
d = time.time()
rdd.persist()
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
train = rdd.filter(lambda x: x[0] != validate_date)\
.map(lambda x:(x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13]))
spark.createDataFrame(train).toDF("y","z","app_list","level2_list","level3_list",
"tag1_list","tag2_list","tag3_list","tag4_list",
"tag5_list","tag6_list","tag7_list","ids") \
train = rdd.filter(lambda x: x[0] != validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13]))
f = time.time()
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids") \
.write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
h = time.time()
print("train tfrecord done")
print((h-d)/60)
print((h - f) / 60)
test = rdd.filter(lambda x: x[0] == validate_date)\
.map(lambda x:(x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13]))
test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13]))
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids") \
.write.format("tfrecords").save(path=path+"va/", mode="overwrite")
.write.format("tfrecords").save(path=path + "va/", mode="overwrite")
print("va tfrecord done")
rdd.unpersist()
return validate_date,value_map,app_list_map,level2_map,level3_map
return validate_date, value_map, app_list_map, leve2_map, leve3_map
def get_predict():
def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids," \
......@@ -256,7 +256,7 @@ def get_predict():
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.train_Knowledge_network_data k on feat.level2 = k.level2_id " \
"limit 5000"
# TODO 把上面的limit 5000删除
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
......@@ -265,19 +265,19 @@ def get_predict():
df = df.na.fill(dict(zip(features, features)))
c = time.time()
rdd = df.select("label", "y", "z","ucity_id","device_id","cid_id","app_list", "level2_ids", "level3_ids",
rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time") \
.rdd.repartition(200).map(lambda x: (x[0],float(x[1]),float(x[2]),x[3],x[4],x[5],
app_list_func(x[6], app_list_map),app_list_func(x[7], level2_map),
app_list_func(x[8], level3_map), app_list_func(x[9], level2_map),
app_list_func(x[10], level2_map),app_list_func(x[11], level2_map),
app_list_func(x[12], level2_map), app_list_func(x[13], level2_map),
app_list_func(x[14], level2_map), app_list_func(x[15], level2_map),
[value_map.get(validate_date, 299999),value_map.get(x[16], 299998),
value_map.get(x[17], 299997),value_map.get(x[18], 299996),
.rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map),
app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map),
app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map),
[value_map.get(date, 299999), value_map.get(x[16], 299998),
value_map.get(x[17], 299997), value_map.get(x[18], 299996),
value_map.get(x[19], 299995), value_map.get(x[20], 299994),
value_map.get(x[21], 299993), value_map.get(x[22], 299992),
value_map.get(x[23], 299991), value_map.get(x[24], 299990),
......@@ -285,6 +285,7 @@ def get_predict():
value_map.get(x[27], 299987), value_map.get(x[28], 299986),
value_map.get(x[29], 299985)
]))
rdd.persist()
d = time.time()
print("rdd")
......@@ -343,8 +344,9 @@ if __name__ == '__main__':
path = "hdfs:///strategy/esmm/"
local_path = "/home/gmuser/esmm/"
validate_date, value_map, app_list_map, level2_map, level3_map = feature_engineer()
get_predict()
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment