Commit 1b9c29c6 authored by 张彦钊's avatar 张彦钊

change test file

parent a6121a48
...@@ -4,6 +4,7 @@ from pyspark.conf import SparkConf ...@@ -4,6 +4,7 @@ from pyspark.conf import SparkConf
import pytispark.pytispark as pti import pytispark.pytispark as pti
# from pyspark.sql import SQLContext # from pyspark.sql import SQLContext
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.functions import _lit_doc
import datetime import datetime
import pandas as pd import pandas as pd
...@@ -36,7 +37,7 @@ def feature_engineer(): ...@@ -36,7 +37,7 @@ def feature_engineer():
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=6)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=3)).strftime("%Y-%m-%d")
print(start) print(start)
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
...@@ -65,7 +66,7 @@ def feature_engineer(): ...@@ -65,7 +66,7 @@ def feature_engineer():
hospital = spark.sql(sql) hospital = spark.sql(sql)
df = df.join(hospital,"diary_service_id","left_outer").fillna("na") df = df.join(hospital,"diary_service_id","left_outer").fillna("na")
df = df.drop("level2").drop("diary_service_id") df = df.drop("diary_service_id")
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer", df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"]) "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
...@@ -86,19 +87,18 @@ def feature_engineer(): ...@@ -86,19 +87,18 @@ def feature_engineer():
2 + apps_number + level2_number + level3_number + len(unique_values))) 2 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp)) value_map = dict(zip(unique_values, temp))
train = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer", rdd = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min", "channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z",)\ "price_max", "treatment_time","maintain_time", "recover_time","y","z",).rdd
.rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map), rdd.persist()
train = rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]], app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]],
value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]], value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]],
value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]], value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]],
value_map[x[13]],value_map[x[14]],value_map[x[15]],value_map[x[16]], value_map[x[13]],value_map[x[14]],value_map[x[15]],value_map[x[16]],
value_map[x[17]], x[18],x[19])) value_map[x[17]], x[18],x[19]))
test = df.select("app_list", "level2_ids", "level3_ids", "stat_date", "ucity_id", "ccity_name", "device_type", test = rdd.filter(lambda x: x[3] == validate_date)\
"manufacturer","channel", "top", "time", "hospital_id", "treatment_method", "price_min",
"price_max", "treatment_time", "maintain_time", "recover_time", "y", "z", ) \
.rdd.filter(lambda x: x[3] == validate_date)\
.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map), .map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
app_list_func(x[2], leve3_map), value_map[x[3]], value_map[x[4]], app_list_func(x[2], leve3_map), value_map[x[3]], value_map[x[4]],
value_map[x[5]], value_map[x[6]], value_map[x[7]], value_map[x[8]], value_map[x[5]], value_map[x[6]], value_map[x[7]], value_map[x[8]],
...@@ -106,95 +106,80 @@ def feature_engineer(): ...@@ -106,95 +106,80 @@ def feature_engineer():
value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], x[18], x[19])) value_map[x[17]], x[18], x[19]))
print("test.count",test.count())
print("train count",train.count())
spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True) spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True)
spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True) spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True)
print("done") print("done")
rdd.unpersist()
return validate_date,value_map,app_list_map,leve2_map,leve3_map return validate_date,value_map,app_list_map,leve2_map,leve3_map
# def get_predict(date,value_map,app_list_map,level2_map,level3_map): def get_predict(date,value_map,app_list_map,level2_map,level3_map):
# sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
# sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \ "u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
# "u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \ "dl.app_list,e.hospital_id,feat.level3_ids," \
# "dl.app_list,e.hospital_id,feat.level3_ids,feat.level2 " \ "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
# "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \ "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
# "left join cid_type_top c on e.device_id = c.device_id " \ "left join cid_type_top c on e.device_id = c.device_id " \
# "left join cid_time_cut cut on e.cid_id = cut.cid " \ "left join cid_time_cut cut on e.cid_id = cut.cid " \
# "left join device_app_list dl on e.device_id = dl.device_id " \ "left join device_app_list dl on e.device_id = dl.device_id " \
# "left join diary_feat feat on e.cid_id = feat.diary_id" "left join diary_feat feat on e.cid_id = feat.diary_id " \
# "left join train_Knowledge_network_data k on feat.level2 = k.level2_id"
#
# df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name", features = ["app_list", "level2_ids", "level3_ids","ucity_id", "ccity_name", "device_type", "manufacturer",
# 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",10: "device_id", "channel", "top", "time", "hospital_id",
# 11: "cid_id", 12: "time",13:"app_list",14:"hospital_id",15:"level3_ids", "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
# 16: "level2"}) df = spark.sql(sql)
#
# db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') df = df.na.fill(dict(zip(features, features)))
# sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \ df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
# "from train_Knowledge_network_data" "device_id","cid,id","label",
# knowledge = con_sql(db, sql) "channel", "top", "time", "app_list", "hospital_id", "level3_ids"])
# knowledge = knowledge.rename(columns={0: "level2", 1: "method", 2: "min", 3: "max",
# 4: "treatment_time", 5: "maintain_time", 6: "recover_time"}) rdd = df.select("app_list", "level2_ids", "level3_ids","ucity_id","device_id","cid_id","label", "y", "z",
# knowledge["level2"] = knowledge["level2"].astype("str") "ccity_name", "device_type","manufacturer", "channel", "top", "time", "hospital_id",
# "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
# df = pd.merge(df, knowledge, on='level2', how='left') "recover_time") \
# df = df.drop("level2", axis=1) .rdd.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], level2_map),
# df = df.drop_duplicates(["ucity_id", "clevel2_id", "ccity_name", "device_type", "manufacturer", app_list_func(x[2], level3_map), x[3],x[4],x[5],x[6],x[7],x[8],
# "channel", "top", "time", "app_list", "hospital_id", "level3_ids"]) value_map[x[3]], value_map[x[9]],
# value_map[x[10]], value_map[x[11]], value_map[x[12]], value_map[x[13]],
# value_map[x[14]], value_map[x[15]], value_map[x[16]], value_map[x[17]],
# df["stat_date"] = date value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
# print(df.head(6)) value_map[date]))
# df["app_list"] = df["app_list"].fillna("lost_na")
# df["app_list"] = df["app_list"].apply(app_list_func,args=(app_list_map,)) rdd.persist()
# df["clevel2_id"] = df["clevel2_id"].fillna("lost_na")
# df["clevel2_id"] = df["clevel2_id"].apply(app_list_func, args=(level2_map,)) native_pre = spark.createDataFrame(rdd.filter(lambda x:x[6] == 0).map(lambda x:(x[3],x[4],x[5])))\
# df["level3_ids"] = df["level3_ids"].fillna("lost_na") .toDF("city","uid","cid_id")
# df["level3_ids"] = df["level3_ids"].apply(app_list_func, args=(level3_map,)) print("native")
# print(native_pre.count())
# # print("predict shape") native_pre.write.csv('/recommend', mode='overwrite', header=True)
# # print(df.shape)
# df["uid"] = df["device_id"] spark.createDataFrame(rdd.filter(lambda x: x[6] == 0)
# df["city"] = df["ucity_id"] .map(lambda x: (x[0], x[1], x[2],x[9],x[10],x[11],x[12],x[13],x[14],x[15],
# features = ["ucity_id", "ccity_name", "device_type", "manufacturer", x[16,x[17],x[18],x[19],x[20],x[21],x[22],x[23]]))) \
# "channel", "top", "time", "stat_date","hospital_id", .toDF("app_list", "level2_ids", "level3_ids","ucity_id",
# "method", "min", "max", "treatment_time", "maintain_time", "recover_time"] "ccity_name", "device_type","manufacturer", "channel", "top", "time", "hospital_id",
# for i in features: "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
# df[i] = df[i].astype("str") "recover_time", "stat_date").write.csv('/recommend/native', mode='overwrite', header=True)
# df[i] = df[i].fillna("lost")
# df[i] = df[i] + i nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
# .toDF("city", "uid", "cid_id")
# native_pre = df[df["label"] == 0] print("nearby")
# native_pre = native_pre.drop("label", axis=1) print(nearby_pre.count())
# nearby_pre = df[df["label"] == 1] nearby_pre.write.csv('/recommend', mode='overwrite', header=True)
# nearby_pre = nearby_pre.drop("label", axis=1)
# spark.createDataFrame(rdd.filter(lambda x: x[6] == 1)
# for i in ["ucity_id", "ccity_name", "device_type", "manufacturer", .map(lambda x: (x[0], x[1], x[2], x[9], x[10], x[11], x[12], x[13], x[14], x[15],
# "channel", "top", "time", "stat_date","hospital_id", x[16, x[17], x[18], x[19], x[20], x[21], x[22], x[23]]))) \
# "method", "min", "max", "treatment_time", "maintain_time", "recover_time"]: .toDF("app_list", "level2_ids", "level3_ids", "ucity_id",
# native_pre[i] = native_pre[i].map(value_map) "ccity_name", "device_type", "manufacturer", "channel", "top", "time", "hospital_id",
# # TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下 "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
# native_pre[i] = native_pre[i].fillna(0) "recover_time","stat_date").write.csv('/recommend/nearby', mode='overwrite', header=True)
#
# nearby_pre[i] = nearby_pre[i].map(value_map) rdd.unpersist()
# # TODO 没有覆盖到的类别会处理成na,暂时用0填充,后续完善一下
# nearby_pre[i] = nearby_pre[i].fillna(0)
#
# print("native")
# print(native_pre.shape)
#
# native_pre[["uid","city","cid_id"]].to_csv(path+"native.csv",index=False)
# write_csv(native_pre, "native",200000)
#
# print("nearby")
# print(nearby_pre.shape)
#
# nearby_pre[["uid","city","cid_id"]].to_csv(path+"nearby.csv",index=False)
# write_csv(nearby_pre, "nearby", 160000)
def con_sql(db,sql): def con_sql(db,sql):
cursor = db.cursor() cursor = db.cursor()
...@@ -244,5 +229,7 @@ if __name__ == '__main__': ...@@ -244,5 +229,7 @@ if __name__ == '__main__':
ti = pti.TiContext(spark) ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test") ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
# feature_engineer() validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
test() get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment