Commit 02e1d7ad authored by 张彦钊's avatar 张彦钊

change test file

parent b37769af
......@@ -75,6 +75,35 @@ def con_sql(db,sql):
db.close()
return df
def train_yingshe(x):
tmp = (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]])
return tmp
def pre_yingshe(x):
tmp = (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map),
app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map),
app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map),
[value_map.get(validate_date, 1), value_map.get(x[16], 2),
value_map.get(x[17], 3), value_map.get(x[18], 4),
value_map.get(x[19], 5), value_map.get(x[20], 6),
value_map.get(x[21], 7), value_map.get(x[22], 8),
value_map.get(x[23], 9), value_map.get(x[24], 10),
value_map.get(x[25], 11), value_map.get(x[26], 12),
value_map.get(x[27], 13), value_map.get(x[28], 14),
value_map.get(x[29], 15)
])
return tmp
def feature_engineer():
apps_number, app_list_map, level2_number, leve2_map, level3_number, leve3_map = get_map()
unique_values = []
......@@ -191,18 +220,25 @@ def feature_engineer():
df = df.na.fill(dict(zip(features, features)))
# rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids",
# "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
# "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
# "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
# "maintain_time", "recover_time").rdd.repartition(200).map(
# lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
# app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
# app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
# app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
# [value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
# value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
# value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))
rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time").rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))
"maintain_time", "recover_time").rdd.repartition(200)\
.mapPartitions(train_yingshe)
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
......@@ -269,26 +305,33 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"])
df = df.na.fill(dict(zip(features, features)))
f = time.time()
# rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
# "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
# "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
# "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
# "maintain_time", "recover_time") \
# .rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
# app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
# app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
# app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map),
# app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map),
# app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map),
# [value_map.get(date,1), value_map.get(x[16],2),
# value_map.get(x[17],3), value_map.get(x[18], 4),
# value_map.get(x[19], 5), value_map.get(x[20], 6),
# value_map.get(x[21], 7), value_map.get(x[22], 8),
# value_map.get(x[23], 9), value_map.get(x[24], 10),
# value_map.get(x[25], 11), value_map.get(x[26], 12),
# value_map.get(x[27], 13), value_map.get(x[28], 14),
# value_map.get(x[29], 15)
# ]))
rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time") \
.rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map),
app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map),
app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map),
[value_map.get(date,1), value_map.get(x[16],2),
value_map.get(x[17],3), value_map.get(x[18], 4),
value_map.get(x[19], 5), value_map.get(x[20], 6),
value_map.get(x[21], 7), value_map.get(x[22], 8),
value_map.get(x[23], 9), value_map.get(x[24], 10),
value_map.get(x[25], 11), value_map.get(x[26], 12),
value_map.get(x[27], 13), value_map.get(x[28], 14),
value_map.get(x[29], 15)
]))
.rdd.repartition(200).mapPartitions(pre_yingshe)
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment