Commit 7c880884 authored by 王志伟's avatar 王志伟
parents aa7db93b a3b3de0c
......@@ -9,7 +9,7 @@ import pandas as pd
def app_list_func(x,l):
b = x.split(",")
b = str(x).split(",")
e = []
for i in b:
if i in l.keys():
......@@ -22,7 +22,7 @@ def app_list_func(x,l):
def multi_hot(df,column,n):
v = df.select(column).distinct().rdd.map(lambda x: x[0]).collect()
app_list_value = [i.split(",") for i in v]
app_list_value = [str(i).split(",") for i in v]
app_list_unique = []
for i in app_list_value:
app_list_unique.extend(i)
......@@ -42,33 +42,20 @@ def feature_engineer():
print(start)
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids," \
"u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id " \
"left join train_Knowledge_network_data k on feat.level2 = k.level2_id " \
"from jerry_test.esmm_train_data e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"left join jerry_test.train_Knowledge_network_data k on feat.level2 = k.level2_id " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"where e.stat_date >= '{}'".format(start)
df = spark.sql(sql)
# TODO 把下面的库改成tidb的数据库
url = "jdbc:mysql://172.16.30.143:3306/zhengxing"
jdbcDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_service").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbcDF.createOrReplaceTempView("api_service")
jdbc = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", url) \
.option("dbtable", "api_doctor").option("user", 'work').option("password", 'BJQaT9VzDcuPBqkd').load()
jdbc.createOrReplaceTempView("api_doctor")
sql = "select s.id as diary_service_id,d.hospital_id " \
"from api_service s left join api_doctor d on s.doctor_id = d.id"
hospital = spark.sql(sql)
df = df.join(hospital,"diary_service_id","left_outer").fillna("na")
df = df.drop("diary_service_id")
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids"])
......@@ -113,13 +100,13 @@ def feature_engineer():
spark.createDataFrame(test).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("tfrecords").option("recordType", "Example").save(path=path+"va/", mode="overwrite")
.repartition(1).write.format("tfrecords").option("recordType", "Example").save(path=path+"va/", mode="overwrite")
print("va write done")
spark.createDataFrame(train).toDF("app_list","level2_ids","level3_ids","stat_date","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id","treatment_method", "price_min",
"price_max", "treatment_time","maintain_time", "recover_time","y","z")\
.write.format("tfrecords").option("recordType", "Example").save(path=path+"tr/", mode="overwrite")
"price_max", "treatment_time","maintain_time", "recover_time","y","z") \
.repartition(1).write.format("tfrecords").option("recordType", "Example").save(path=path+"tr/", mode="overwrite")
print("done")
rdd.unpersist()
......@@ -178,7 +165,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("app_list", "level2_ids", "level3_ids","y","z","ucity_id",
"ccity_name", "device_type","manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time", "top","stat_date").write.format("tfrecords").option("recordType", "Example")\
"recover_time", "top","stat_date").repartition(1).write.format("tfrecords").option("recordType", "Example") \
.save(path=path+"native/", mode="overwrite")
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
......@@ -194,7 +181,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.toDF("app_list", "level2_ids", "level3_ids","y","z", "ucity_id",
"ccity_name", "device_type", "manufacturer", "channel", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time","top","stat_date").write.format("tfrecords").option("recordType", "Example")\
"recover_time","top","stat_date").repartition(1).write.format("tfrecords").option("recordType", "Example") \
.save(path=path+"nearby/", mode="overwrite")
rdd.unpersist()
......@@ -235,7 +222,7 @@ if __name__ == '__main__':
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
# ti.tidbMapDatabase("eagle")
ti.tidbMapDatabase("eagle")
spark.sparkContext.setLogLevel("WARN")
path = "hdfs:///strategy/esmm/"
local_path = "/home/gmuser/test/"
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment