Commit 9eac3bf3 authored by 张彦钊's avatar 张彦钊

修改测试文件

parent a50dda9a
......@@ -17,15 +17,16 @@ def app_list_func(x,l):
e.append(0)
return ",".join([str(j) for j in e])
def multi_hot(df,column,n):
app_list_value = [i.split(",") for i in df.select(column).collect().unique()]
v = set(df.select(column).rdd.map(lambda x: x[0]).collect())
app_list_value = [i.split(",") for i in v]
app_list_unique = []
for i in app_list_value:
app_list_unique.extend(i)
app_list_unique = list(set(app_list_unique))
number = len(app_list_unique)
app_list_map = dict(zip(app_list_unique, list(range(n, number + n))))
df = df.select(column).apply(app_list_func, args=(app_list_map,))
return number,app_list_map
def feature_engineer():
......@@ -62,76 +63,69 @@ def feature_engineer():
df = spark.sql(sql)
url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbcDF.createOrReplaceTempView("api_service")
jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbc.createOrReplaceTempView("api_doctor")
sql = "select s.id as diary_service_id,d.hospital_id " \
"from api_service s left join api_doctor d on s.doctor_id = d.id"
hospital = spark.sql(sql)
df = df.join(hospital,"diary_service_id","left_outer").fillna("na")
df = df.drop("level2").drop("diary_service_id")
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
df = df.na.fill(dict(zip(features,features)))
apps_number, app_list_map = multi_hot(df,"app_list",1)
level2_number,leve2_map = multi_hot(df,"level2_ids",1 + apps_number)
level3_number, leve3_map = multi_hot(df, "level3_ids", 1 + apps_number + level2_number)
unique_values = []
for i in features:
unique_values.extend(list(set(df.select(i).rdd.map(lambda x: x[0]).collect())))
temp = list(range(2 + apps_number + level2_number + level3_number,
2 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp))
train = df.select("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z",)\
.rdd.filter(lambda x: x[3]!= validate_date).map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
app_list_func(x[2], leve3_map),value_map[x[3]],value_map[x[4]],
value_map[x[5]],value_map[x[6]],value_map[x[7]],value_map[x[8]],
value_map[x[9]],value_map[x[10]],value_map[x[11]],value_map[x[12]],
value_map[x[13]],value_map[x[14]],value_map[x[15]],value_map[x[16]],
value_map[x[17]], x[18],x[19]))
test = df.select("app_list", "level2_ids", "level3_ids", "stat_date", "ucity_id", "ccity_name", "device_type",
"manufacturer",
"channel", "top", "time", "hospital_id", "treatment_method", "price_min",
"price_max", "treatment_time", "maintain_time", "recover_time", "y", "z", ) \
.rdd.filter(lambda x: x[3] == validate_date).map(
lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], leve2_map),
app_list_func(x[2], leve3_map), value_map[x[3]], value_map[x[4]],
value_map[x[5]], value_map[x[6]], value_map[x[7]], value_map[x[8]],
value_map[x[9]], value_map[x[10]], value_map[x[11]], value_map[x[12]],
value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], x[18], x[19]))
spark.createDataFrame(test).show(6)
# url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
# jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
# .option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
# jdbcDF.createOrReplaceTempView("api_service")
# jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
# .option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
# jdbc.createOrReplaceTempView("api_doctor")
#
# sql = "select s.id as diary_service_id,d.hospital_id " \
# "from api_service s left join api_doctor d on s.doctor_id = d.id"
# hospital = spark.sql(sql)
#
# df = df.join(hospital,"diary_service_id","left_outer").fillna("na")
# df = df.drop("level2").drop("diary_service_id")
# df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
#
df = df.fillna("na")
v = set(df.select("app_list").rdd.map(lambda x: x[0]).collect())
app_list_value = [i.split(",") for i in v]
app_list_unique = []
for i in app_list_value:
app_list_unique.extend(i)
app_list_unique = list(set(app_list_unique))
number = len(app_list_unique)
app_list_map = dict(zip(app_list_unique, list(range(1, number + 1))))
a = df.select("app_list","stat_date").rdd.map(lambda x:(app_list_func(x[0],app_list_map),x[1]))
spark.createDataFrame(a).show(6)
# app_list_number, app_list_map = multi_hot(df, "app_list", 2)
# level2_number, level2_map = multi_hot(df, "clevel2_id", 2 + app_list_number)
# level3_number, level3_map = multi_hot(df, "level3_ids", 2 + app_list_number + level2_number)
#
# unique_values = []
# features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "hospital_id",
# "method", "min", "max", "treatment_time", "maintain_time", "recover_time"]
# for i in features:
# df[i] = df[i].astype("str")
# df[i] = df[i].fillna("lost")
# # 下面这行代码是为了区分不同的列中有相同的值
# df[i] = df[i] + i
# unique_values.extend(list(df[i].unique()))
#
# temp = list(range(2 + app_list_number + level2_number + level3_number,
# 2 + app_list_number + level2_number + level3_number + len(unique_values)))
# value_map = dict(zip(unique_values, temp))
#
# df = df.drop("device_id", axis=1)
# train = df[df["stat_date"] != validate_date + "stat_date"]
# test = df[df["stat_date"] == validate_date + "stat_date"]
# for i in ["ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "stat_date", "hospital_id",
# "method", "min", "max", "treatment_time", "maintain_time", "recover_time"]:
# train[i] = train[i].map(value_map)
# test[i] = test[i].map(value_map)
#
# print("train shape")
# print(train.shape)
# print("test shape")
# print(test.shape)
#
# write_csv(train, "tr", 100000)
# write_csv(test, "va", 80000)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment