Commit f212a78c authored by 张彦钊's avatar 张彦钊

把最近一天的数据集放进训练集

parent 705a3a13
......@@ -114,71 +114,71 @@ def feature_engineer():
return validate_date,value_map,app_list_map,leve2_map,leve3_map
def get_predict(date,value_map,app_list_map,level2_map,level3_map):
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id " \
"left join train_Knowledge_network_data k on feat.level2 = k.level2_id"
features = ["app_list", "level2_ids", "level3_ids","ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
df = spark.sql(sql)
df = df.na.fill(dict(zip(features, features)))
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"device_id","cid,id","label",
"channel", "top", "time", "app_list", "hospital_id", "level3_ids"])
rdd = df.select("app_list", "level2_ids", "level3_ids","ucity_id","device_id","cid_id","label", "y", "z",
"ccity_name", "device_type","manufacturer", "channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time") \
.rdd.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], level2_map),
app_list_func(x[2], level3_map), x[3],x[4],x[5],x[6],x[7],x[8],
value_map[x[3]], value_map[x[9]],
value_map[x[10]], value_map[x[11]], value_map[x[12]], value_map[x[13]],
value_map[x[14]], value_map[x[15]], value_map[x[16]], value_map[x[17]],
value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[date]))
rdd.persist()
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[6] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id")
print("native")
print(native_pre.count())
native_pre.write.csv('/recommend', mode='overwrite', header=True)
spark.createDataFrame(rdd.filter(lambda x: x[6] == 0)
.map(lambda x: (x[0], x[1], x[2],x[9],x[10],x[11],x[12],x[13],x[14],x[15],
x[16,x[17],x[18],x[19],x[20],x[21],x[22],x[23]]))) \
.toDF("app_list", "level2_ids", "level3_ids","ucity_id",
"ccity_name", "device_type","manufacturer", "channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time", "stat_date").write.csv('/recommend/native', mode='overwrite', header=True)
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
.toDF("city", "uid", "cid_id")
print("nearby")
print(nearby_pre.count())
nearby_pre.write.csv('/recommend', mode='overwrite', header=True)
spark.createDataFrame(rdd.filter(lambda x: x[6] == 1)
.map(lambda x: (x[0], x[1], x[2], x[9], x[10], x[11], x[12], x[13], x[14], x[15],
x[16, x[17], x[18], x[19], x[20], x[21], x[22], x[23]]))) \
.toDF("app_list", "level2_ids", "level3_ids", "ucity_id",
"ccity_name", "device_type", "manufacturer", "channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"recover_time","stat_date").write.csv('/recommend/nearby', mode='overwrite', header=True)
rdd.unpersist()
# def get_predict(date,value_map,app_list_map,level2_map,level3_map):
# sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
# "u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
# "dl.app_list,e.hospital_id,feat.level3_ids," \
# "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
# "from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
# "left join cid_type_top c on e.device_id = c.device_id " \
# "left join cid_time_cut cut on e.cid_id = cut.cid " \
# "left join device_app_list dl on e.device_id = dl.device_id " \
# "left join diary_feat feat on e.cid_id = feat.diary_id " \
# "left join train_Knowledge_network_data k on feat.level2 = k.level2_id"
#
# features = ["app_list", "level2_ids", "level3_ids","ucity_id", "ccity_name", "device_type", "manufacturer",
# "channel", "top", "time", "hospital_id",
# "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
# df = spark.sql(sql)
#
# df = df.na.fill(dict(zip(features, features)))
# df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
# "device_id","cid,id","label",
# "channel", "top", "time", "app_list", "hospital_id", "level3_ids"])
#
# rdd = df.select("app_list", "level2_ids", "level3_ids","ucity_id","device_id","cid_id","label", "y", "z",
# "ccity_name", "device_type","manufacturer", "channel", "top", "time", "hospital_id",
# "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
# "recover_time") \
# .rdd.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], level2_map),
# app_list_func(x[2], level3_map), x[3],x[4],x[5],x[6],x[7],x[8],
# value_map[x[3]], value_map[x[9]],
# value_map[x[10]], value_map[x[11]], value_map[x[12]], value_map[x[13]],
# value_map[x[14]], value_map[x[15]], value_map[x[16]], value_map[x[17]],
# value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
# value_map[date]))
#
# rdd.persist()
#
# native_pre = spark.createDataFrame(rdd.filter(lambda x:x[6] == 0).map(lambda x:(x[3],x[4],x[5])))\
# .toDF("city","uid","cid_id")
# print("native")
# print(native_pre.count())
# native_pre.write.csv('/recommend', mode='overwrite', header=True)
#
# spark.createDataFrame(rdd.filter(lambda x: x[6] == 0)
# .map(lambda x: (x[0], x[1], x[2],x[9],x[10],x[11],x[12],x[13],x[14],x[15],
# x[16,x[17],x[18],x[19],x[20],x[21],x[22],x[23]]))) \
# .toDF("app_list", "level2_ids", "level3_ids","ucity_id",
# "ccity_name", "device_type","manufacturer", "channel", "top", "time", "hospital_id",
# "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
# "recover_time", "stat_date").write.csv('/recommend/native', mode='overwrite', header=True)
#
# nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
# .toDF("city", "uid", "cid_id")
# print("nearby")
# print(nearby_pre.count())
# nearby_pre.write.csv('/recommend', mode='overwrite', header=True)
#
# spark.createDataFrame(rdd.filter(lambda x: x[6] == 1)
# .map(lambda x: (x[0], x[1], x[2], x[9], x[10], x[11], x[12], x[13], x[14], x[15],
# x[16, x[17], x[18], x[19], x[20], x[21], x[22], x[23]]))) \
# .toDF("app_list", "level2_ids", "level3_ids", "ucity_id",
# "ccity_name", "device_type", "manufacturer", "channel", "top", "time", "hospital_id",
# "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
# "recover_time","stat_date").write.csv('/recommend/nearby', mode='overwrite', header=True)
#
# rdd.unpersist()
def con_sql(db,sql):
......@@ -191,13 +191,19 @@ def con_sql(db,sql):
def test():
spark.sql("use online")
spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR /srv/apps/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
sql = "select stat_date,cid_id from esmm_train_data e where stat_date >= '{}'".format("2019-04-25")
df = spark.createDataFrame(spark.sql(sql).rdd.map(lambda x:(x[0],x[1])).zipWithIndex()
.map(lambda x:(x[1],x[0][0],x[0][1]))).toDF("ind","k","v")
df.show(6)
df.write.csv('/recommend/test', mode='overwrite', header=True)
spark.sql("select cl_type from online.tl_hdfs_maidian_view where partition_date = '20190312' limit 6").show()
# spark.sql("use online")
# spark.sql("ADD JAR /srv/apps/brickhouse-0.7.1-SNAPSHOT.jar")
# spark.sql("ADD JAR /srv/apps/hive-udf-1.0-SNAPSHOT.jar")
# spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
# spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
#
# spark.sql("select cl_type from online.tl_hdfs_maidian_view where partition_date = '20190312' limit 6").show()
......@@ -229,7 +235,8 @@ if __name__ == '__main__':
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN")
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
test()
# validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
# get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment