Commit 6dabcb31 authored by 张彦钊's avatar 张彦钊

修改测试文件

parent 4bc8ff16
...@@ -36,7 +36,7 @@ def feature_engineer(): ...@@ -36,7 +36,7 @@ def feature_engineer():
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=2)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(start) print(start)
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
...@@ -51,67 +51,68 @@ def feature_engineer(): ...@@ -51,67 +51,68 @@ def feature_engineer():
"where e.stat_date >= '{}'".format(start) "where e.stat_date >= '{}'".format(start)
df = spark.sql(sql) df = spark.sql(sql)
df.write.csv('/recommend/va', mode='overwrite', header=True)
url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \ # url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
.option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load() # jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
jdbcDF.createOrReplaceTempView("api_service") # .option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \ # jdbcDF.createOrReplaceTempView("api_service")
.option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load() # jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
jdbc.createOrReplaceTempView("api_doctor") # .option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
# jdbc.createOrReplaceTempView("api_doctor")
sql = "select s.id as diary_service_id,d.hospital_id " \ #
"from api_service s left join api_doctor d on s.doctor_id = d.id" # sql = "select s.id as diary_service_id,d.hospital_id " \
hospital = spark.sql(sql) # "from api_service s left join api_doctor d on s.doctor_id = d.id"
# hospital = spark.sql(sql)
df = df.join(hospital,"diary_service_id","left_outer").fillna("na") #
df = df.drop("level2").drop("diary_service_id") # df = df.join(hospital,"diary_service_id","left_outer").fillna("na")
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer", # df = df.drop("level2").drop("diary_service_id")
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"]) # df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
features = ["ucity_id", "ccity_name", "device_type", "manufacturer", #
"channel", "top", "time", "stat_date", "hospital_id", # features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"] # "channel", "top", "time", "stat_date", "hospital_id",
# "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
df = df.na.fill(dict(zip(features,features))) #
# df = df.na.fill(dict(zip(features,features)))
apps_number, app_list_map = multi_hot(df,"app_list",1) #
level2_number,leve2_map = multi_hot(df,"level2_ids",1 + apps_number) # apps_number, app_list_map = multi_hot(df,"app_list",1)
level3_number, leve3_map = multi_hot(df, "level3_ids", 1 + apps_number + level2_number) # level2_number,leve2_map = multi_hot(df,"level2_ids",1 + apps_number)
# level3_number, leve3_map = multi_hot(df, "level3_ids", 1 + apps_number + level2_number)
unique_values = [] #
for i in features: # unique_values = []
unique_values.extend(list(set(df.select(i).rdd.map(lambda x: x[0]).collect()))) # for i in features:
temp = list(range(2 + apps_number + level2_number + level3_number, # unique_values.extend(list(set(df.select(i).rdd.map(lambda x: x[0]).collect())))
2 + apps_number + level2_number + level3_number + len(unique_values))) # temp = list(range(2 + apps_number + level2_number + level3_number,
value_map = dict(zip(unique_values, temp)) # 2 + apps_number + level2_number + level3_number + len(unique_values)))
# value_map = dict(zip(unique_values, temp))
train = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer", #
"channel", "top", "time", "hospital_id","treatment_method", "price_min", # train = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"price_max", "treatment_time","maintain_time", "recover_time","y","z",)\ # "channel", "top", "time", "hospital_id","treatment_method", "price_min",
.rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map), # "price_max", "treatment_time","maintain_time", "recover_time","y","z",)\
app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]], # .rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]], # app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]],
value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]], # value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]],
value_map[x[13]],value_map[x[14]],value_map[x[15]],value_map[x[16]], # value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]],
value_map[x[17]], x[18],x[19])) # value_map[x[13]],value_map[x[14]],value_map[x[15]],value_map[x[16]],
test = df.select("app_list", "level2_ids", "level3_ids", "stat_date", "ucity_id", "ccity_name", "device_type", # value_map[x[17]], x[18],x[19]))
"manufacturer","channel", "top", "time", "hospital_id", "treatment_method", "price_min", # test = df.select("app_list", "level2_ids", "level3_ids", "stat_date", "ucity_id", "ccity_name", "device_type",
"price_max", "treatment_time", "maintain_time", "recover_time", "y", "z", ) \ # "manufacturer","channel", "top", "time", "hospital_id", "treatment_method", "price_min",
.rdd.filter(lambda x: x[3] == validate_date)\ # "price_max", "treatment_time", "maintain_time", "recover_time", "y", "z", ) \
.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map), # .rdd.filter(lambda x: x[3] == validate_date)\
app_list_func(x[2], leve3_map), value_map[x[3]], value_map[x[4]], # .map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
value_map[x[5]], value_map[x[6]], value_map[x[7]], value_map[x[8]], # app_list_func(x[2], leve3_map), value_map[x[3]], value_map[x[4]],
value_map[x[9]], value_map[x[10]], value_map[x[11]], value_map[x[12]], # value_map[x[5]], value_map[x[6]], value_map[x[7]], value_map[x[8]],
value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]], # value_map[x[9]], value_map[x[10]], value_map[x[11]], value_map[x[12]],
value_map[x[17]], x[18], x[19])) # value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
# value_map[x[17]], x[18], x[19]))
print("test.count",test.count())
print("train count",train.count()) # print("test.count",test.count())
# print("train count",train.count())
spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True)
spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True) # spark.createDataFrame(test).write.csv('/recommend/va', mode='overwrite', header=True)
print("done") # spark.createDataFrame(train).write.csv('/recommend/tr', mode='overwrite', header=True)
# print("done")
return validate_date,value_map,app_list_map,leve2_map,leve3_map return validate_date,value_map,app_list_map,leve2_map,leve3_map
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment