Commit dc7f3d96 authored by Your Name's avatar Your Name

change predict and sort process

parents 7dd548c6 6219b3d8
...@@ -89,6 +89,19 @@ def get_pre_number(): ...@@ -89,6 +89,19 @@ def get_pre_number():
def feature_engineer(): def feature_engineer():
apps_number, app_list_map, level2_number, leve2_map, level3_number, leve3_map = get_map() apps_number, app_list_map, level2_number, leve2_map, level3_number, leve3_map = get_map()
app_list_map["app_list"] = 16
leve3_map["level3_ids"] = 17
leve3_map["search_tag3"] = 18
leve2_map["level2_ids"] = 19
leve2_map["tag1"] = 20
leve2_map["tag2"] = 21
leve2_map["tag3"] = 22
leve2_map["tag4"] = 23
leve2_map["tag5"] = 24
leve2_map["tag6"] = 25
leve2_map["tag7"] = 26
leve2_map["search_tag2"] = 27
unique_values = [] unique_values = []
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct stat_date from esmm_train_data_dwell" sql = "select distinct stat_date from esmm_train_data_dwell"
...@@ -162,22 +175,25 @@ def feature_engineer(): ...@@ -162,22 +175,25 @@ def feature_engineer():
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
features = ["ucity_id", "ccity_name", "device_type", "manufacturer", features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id", "channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"] "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
"app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"search_tag2", "search_tag3"]
unique_values.extend(features) unique_values.extend(features)
print("unique_values length") print("unique_values length")
print(len(unique_values)) print(len(unique_values))
print("特征维度:") print("特征维度:")
print(apps_number + level2_number + level3_number + len(unique_values)) print(apps_number + level2_number + level3_number + len(unique_values))
temp = list(range(16 + apps_number + level2_number + level3_number, temp = list(range(28 + apps_number + level2_number + level3_number,
16 + apps_number + level2_number + level3_number + len(unique_values))) 28 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp)) value_map = dict(zip(unique_values, temp))
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \ "u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \ "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7," \ "ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \ "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \
"e.device_id,e.cid_id " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \ "from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \ "left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \ "left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
...@@ -193,6 +209,7 @@ def feature_engineer(): ...@@ -193,6 +209,7 @@ def feature_engineer():
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \ "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \ "left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \ "left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
"where e.stat_date >= '{}'".format(start) "where e.stat_date >= '{}'".format(start)
df = spark.sql(sql) df = spark.sql(sql)
...@@ -207,16 +224,20 @@ def feature_engineer(): ...@@ -207,16 +224,20 @@ def feature_engineer():
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time", "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time", "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time").rdd.repartition(200).map( "maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\
.rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map), lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map), app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map), app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map), app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map.get(x[0],1), value_map.get(x[13],2), value_map.get(x[14],3), value_map.get(x[15],4), [value_map.get(x[0], 1), value_map.get(x[13], 2), value_map.get(x[14], 3), value_map.get(x[15], 4),
value_map.get(x[16],5),value_map.get(x[17],6), value_map.get(x[18],7), value_map.get(x[19],8), value_map.get(x[16], 5), value_map.get(x[17], 6), value_map.get(x[18], 7), value_map.get(x[19], 8),
value_map.get(x[20],9), value_map.get(x[21],10), value_map.get(x[20], 9), value_map.get(x[21], 10),
value_map.get(x[22],11), value_map.get(x[23],12), value_map.get(x[24],13), value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13),
value_map.get(x[25],14), value_map.get(x[26],15)])) value_map.get(x[25], 14), value_map.get(x[26], 15)],
app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30]
))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER) rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
...@@ -224,11 +245,12 @@ def feature_engineer(): ...@@ -224,11 +245,12 @@ def feature_engineer():
train = rdd.map( train = rdd.map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13])) x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
f = time.time() f = time.time()
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list", spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids") \ "tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \
.repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite") .repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
h = time.time() h = time.time()
print("train tfrecord done") print("train tfrecord done")
...@@ -241,11 +263,12 @@ def feature_engineer(): ...@@ -241,11 +263,12 @@ def feature_engineer():
test = rdd.filter(lambda x: x[0] == validate_date).map( test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13])) x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list", spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids") \ "tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \
.repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite") .repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
print("va tfrecord done") print("va tfrecord done")
...@@ -260,7 +283,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -260,7 +283,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \ "u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids," \ "dl.app_list,e.hospital_id,feat.level3_ids," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \ "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7," \ "ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \ "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from jerry_test.esmm_pre_data e " \ "from jerry_test.esmm_pre_data e " \
"left join jerry_test.user_feature u on e.device_id = u.device_id " \ "left join jerry_test.user_feature u on e.device_id = u.device_id " \
...@@ -275,63 +298,62 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -275,63 +298,62 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \ "left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \ "left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \ "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id" "left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date"
features = ["ucity_id", "ccity_name", "device_type", "manufacturer", features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id", "channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"] "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
"app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"search_tag2", "search_tag3"]
df = spark.sql(sql) df = spark.sql(sql)
df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"]) df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"])
df = df.na.fill(dict(zip(features, features))) df = df.na.fill(dict(zip(features, features)))
f = time.time() f = time.time()
rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids", rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time", "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time", "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time") \ "maintain_time", "recover_time", "search_tag2", "search_tag3") \
.rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5], .rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map), app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map), app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map), app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map),
app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map), app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map),
app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map), app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map),
[value_map.get(date,1), value_map.get(x[16],2), [value_map.get(date, 1), value_map.get(x[16], 2),
value_map.get(x[17],3), value_map.get(x[18], 4), value_map.get(x[17], 3), value_map.get(x[18], 4),
value_map.get(x[19], 5), value_map.get(x[20], 6), value_map.get(x[19], 5), value_map.get(x[20], 6),
value_map.get(x[21], 7), value_map.get(x[22], 8), value_map.get(x[21], 7), value_map.get(x[22], 8),
value_map.get(x[23], 9), value_map.get(x[24], 10), value_map.get(x[23], 9), value_map.get(x[24], 10),
value_map.get(x[25], 11), value_map.get(x[26], 12), value_map.get(x[25], 11), value_map.get(x[26], 12),
value_map.get(x[27], 13), value_map.get(x[28], 14), value_map.get(x[27], 13), value_map.get(x[28], 14),
value_map.get(x[29], 15) value_map.get(x[29], 15)], app_list_func(x[30], leve2_map),
])) app_list_func(x[31], leve3_map)))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER) rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
print(rdd.count())
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id")
print("native csv")
native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0) spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16]))) \ .map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
.toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list", x[12], x[13], x[14], x[15], x[16], x[17], x[18],x[3],x[4],x[5]))) \
"tag5_list", "tag6_list", "tag7_list", "ids").repartition(1).write.format("tfrecords") \ .toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
.save(path=path+"native/", mode="overwrite") "tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list","city","uid","cid_id") \
.repartition(1).write.format("tfrecords").save(path=path+"native/", mode="overwrite")
print("native tfrecord done") print("native tfrecord done")
h = time.time() h = time.time()
print((h-f)/60) print((h-f)/60)
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[0] == 1).map(lambda x: (x[3], x[4], x[5]))) \
.toDF("city", "uid", "cid_id")
print("nearby csv")
nearby_pre.toPandas().to_csv(local_path + "nearby.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 1) spark.createDataFrame(rdd.filter(lambda x: x[0] == 1)
.map( .map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16]))) \ x[12], x[13], x[14], x[15], x[16], x[17], x[18],x[3],x[4],x[5]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list", .toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids").repartition(1).write.format("tfrecords") \ "tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list","city","uid","cid_id") \
.save(path=path + "nearby/", mode="overwrite") .repartition(1).write.format("tfrecords").save(path=path + "nearby/", mode="overwrite")
print("nearby tfrecord done") print("nearby tfrecord done")
......
...@@ -59,10 +59,14 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False): ...@@ -59,10 +59,14 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
"tag6_list": tf.VarLenFeature(tf.int64), "tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64), "tag7_list": tf.VarLenFeature(tf.int64),
"search_tag2_list": tf.VarLenFeature(tf.int64), "search_tag2_list": tf.VarLenFeature(tf.int64),
<<<<<<< HEAD
"search_tag3_list": tf.VarLenFeature(tf.int64), "search_tag3_list": tf.VarLenFeature(tf.int64),
"uid": tf.VarLenFeature(tf.string), "uid": tf.VarLenFeature(tf.string),
"city": tf.VarLenFeature(tf.string), "city": tf.VarLenFeature(tf.string),
"cid_id": tf.VarLenFeature(tf.string) "cid_id": tf.VarLenFeature(tf.string)
=======
"search_tag3_list": tf.VarLenFeature(tf.int64)
>>>>>>> 6219b3d856f8bfae4b542ea25d6ffb4209e193ad
} }
parsed = tf.parse_single_example(record, features) parsed = tf.parse_single_example(record, features)
y = parsed.pop('y') y = parsed.pop('y')
...@@ -131,10 +135,13 @@ def model_fn(features, labels, mode, params): ...@@ -131,10 +135,13 @@ def model_fn(features, labels, mode, params):
tag7_list = features['tag7_list'] tag7_list = features['tag7_list']
search_tag2_list = features['search_tag2_list'] search_tag2_list = features['search_tag2_list']
search_tag3_list = features['search_tag3_list'] search_tag3_list = features['search_tag3_list']
<<<<<<< HEAD
uid = features['uid'] uid = features['uid']
city = features['city'] city = features['city']
cid_id = features['cid_id'] cid_id = features['cid_id']
=======
>>>>>>> 6219b3d856f8bfae4b542ea25d6ffb4209e193ad
if FLAGS.task_type != "infer": if FLAGS.task_type != "infer":
y = labels['y'] y = labels['y']
...@@ -160,10 +167,13 @@ def model_fn(features, labels, mode, params): ...@@ -160,10 +167,13 @@ def model_fn(features, labels, mode, params):
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K) # x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1, x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1,
tag2, tag3, tag4, tag5, tag6, tag7,search_tag2,search_tag3], axis=1) tag2, tag3, tag4, tag5, tag6, tag7,search_tag2,search_tag3], axis=1)
<<<<<<< HEAD
uid = tf.sparse.to_dense(uid,default_value="") uid = tf.sparse.to_dense(uid,default_value="")
city = tf.sparse.to_dense(city,default_value="") city = tf.sparse.to_dense(city,default_value="")
cid_id = tf.sparse.to_dense(cid_id,default_value="") cid_id = tf.sparse.to_dense(cid_id,default_value="")
=======
>>>>>>> 6219b3d856f8bfae4b542ea25d6ffb4209e193ad
with tf.name_scope("CVR_Task"): with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN: if mode == tf.estimator.ModeKeys.TRAIN:
......
...@@ -2,12 +2,11 @@ ...@@ -2,12 +2,11 @@
import pymysql import pymysql
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
import pytispark.pytispark as pti import pytispark.pytispark as pti
# from pyspark.sql import SQLContext
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import datetime import datetime
import pandas as pd import pandas as pd
import subprocess import time
import tensorflow as tf from pyspark import StorageLevel
def app_list_func(x,l): def app_list_func(x,l):
...@@ -21,8 +20,11 @@ def app_list_func(x,l): ...@@ -21,8 +20,11 @@ def app_list_func(x,l):
return e return e
def multi_hot(df,column,n): def get_list(db,sql,n):
v = df.select(column).distinct().rdd.map(lambda x: x[0]).collect() cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
v = list(set([i[0] for i in result]))
app_list_value = [str(i).split(",") for i in v] app_list_value = [str(i).split(",") for i in v]
app_list_unique = [] app_list_unique = []
for i in app_list_value: for i in app_list_value:
...@@ -30,182 +32,352 @@ def multi_hot(df,column,n): ...@@ -30,182 +32,352 @@ def multi_hot(df,column,n):
app_list_unique = list(set(app_list_unique)) app_list_unique = list(set(app_list_unique))
number = len(app_list_unique) number = len(app_list_unique)
app_list_map = dict(zip(app_list_unique, list(range(n, number + n)))) app_list_map = dict(zip(app_list_unique, list(range(n, number + n))))
return number,app_list_map db.close()
return number, app_list_map
def feature(): def get_map():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data" sql = "select app_list from device_app_list"
validate_date = con_sql(db, sql)[0].values.tolist()[0] a = time.time()
print("validate_date:" + validate_date) apps_number, app_list_map = get_list(db,sql,16)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") print("applist")
start = (temp - datetime.timedelta(days=2)).strftime("%Y-%m-%d") print((time.time()-a)/60)
print(start) db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_ids from diary_feat"
b = time.time()
leve2_number, leve2_map = get_list(db, sql, 16+apps_number)
print("leve2")
print((time.time() - b) / 60)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level3_ids from diary_feat"
c = time.time()
leve3_number, leve3_map = get_list(db, sql, 16+leve2_number+apps_number)
print((time.time() - c) / 60)
return apps_number, app_list_map,leve2_number, leve2_map,leve3_number, leve3_map
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids " \
"from jerry_test.esmm_train_data e " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"where e.stat_date >= '{}'".format(start)
df = spark.sql(sql) def get_unique(db,sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
v = list(set([i[0] for i in result]))
db.close()
print(sql)
print(len(v))
return v
def con_sql(db,sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
db.close()
return df
features = ["ucity_id","stat_date"] def get_pre_number():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select count(*) from esmm_pre_data"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchone()[0]
print("预测集数量:")
print(result)
db.close()
df = df.na.fill(dict(zip(features,features)))
apps_number, app_list_map = multi_hot(df,"level2_ids",1) def feature_engineer():
apps_number, app_list_map, level2_number, leve2_map, level3_number, leve3_map = get_map()
app_list_map["app_list"] = 16
leve3_map["level3_ids"] = 17
leve3_map["search_tag3"] = 18
leve2_map["level2_ids"] = 19
leve2_map["tag1"] = 20
leve2_map["tag2"] = 21
leve2_map["tag3"] = 22
leve2_map["tag4"] = 23
leve2_map["tag5"] = 24
leve2_map["tag6"] = 25
leve2_map["tag7"] = 26
leve2_map["search_tag2"] = 27
unique_values = [] unique_values = []
for i in features: db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
unique_values.extend(df.select(i).distinct().rdd.map(lambda x: x[0]).collect()) sql = "select distinct stat_date from esmm_train_data_dwell"
temp = list(range(2 + apps_number, unique_values.extend(get_unique(db,sql))
2 + apps_number + len(unique_values)))
value_map = dict(zip(unique_values, temp))
rdd = df.select("level2_ids","stat_date","ucity_id","y","z").rdd db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
rdd.persist() sql = "select distinct ucity_id from esmm_train_data_dwell"
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集 unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ccity_name from esmm_train_data_dwell"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct time from cid_time_cut"
unique_values.extend(get_unique(db, sql))
train = rdd.filter(lambda x: x[1]!= validate_date)\ db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
.map(lambda x: (app_list_func(x[0], app_list_map),[value_map[x[2]],value_map[x[1]]], float(x[3]),float(x[4]))) sql = "select distinct device_type from user_feature"
test = rdd.filter(lambda x: x[1]== validate_date)\ unique_values.extend(get_unique(db, sql))
.map(lambda x: (app_list_func(x[0], app_list_map),[value_map[x[2]],value_map[x[1]]], float(x[3]),float(x[4])))
spark.createDataFrame(test).toDF("level2_ids","ids","y","z")\ db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
.repartition(1).write.format("tfrecords").save(path=path+"va/", mode="overwrite") sql = "select distinct manufacturer from user_feature"
unique_values.extend(get_unique(db, sql))
print("va write done") db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
spark.createDataFrame(train).toDF("level2_ids","ids","y","z") \ sql = "select distinct channel from user_feature"
.repartition(1).write.format("tfrecords").save(path=path+"tr/", mode="overwrite") unique_values.extend(get_unique(db, sql))
print("done") db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
rdd.unpersist() sql = "select distinct top from cid_type_top"
unique_values.extend(get_unique(db, sql))
return validate_date,value_map,app_list_map db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_min from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_method from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_max from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct maintain_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct recover_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data_dwell"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=100)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"where e.stat_date >= '{}'".format(start)
unique_values.extend(get_unique(db, sql))
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
"app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"search_tag2", "search_tag3"]
unique_values.extend(features)
print("unique_values length")
print(len(unique_values))
print("特征维度:")
print(apps_number + level2_number + level3_number + len(unique_values))
temp = list(range(28 + apps_number + level2_number + level3_number,
28 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp))
def get_predict(date,value_map,app_list_map): sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.device_id,e.cid_id from esmm_pre_data e " \ "u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"left join diary_feat feat on e.cid_id = feat.diary_id limit 50000" "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \
"e.device_id,e.cid_id " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
"left join jerry_test.question_tag question on e.device_id = question.device_id " \
"left join jerry_test.search_tag search on e.device_id = search.device_id " \
"left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
"where e.stat_date >= '{}'".format(start)
features = ["ucity_id"]
df = spark.sql(sql) df = spark.sql(sql)
df = df.na.fill(dict(zip(features, features)))
rdd = df.select("level2_ids","ucity_id","device_id","cid_id","label", "y", "z") \
.rdd.map(lambda x: (app_list_func(x[0], app_list_map),x[1],x[2],x[3],x[4],float(x[5]),float(x[6]),
[value_map.get(x[1], 299999),value_map.get(date, 299998)]))
rdd.persist()
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[4] == 0).map(lambda x:(x[1],x[2],x[3])))\
.toDF("city","uid","cid_id")
print("native")
native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
# TODO 写成csv文件改成下面这样
# native_pre.coalesce(1).write.format('com.databricks.spark.csv').save(path+"native/",header = 'true')
# 预测的tfrecord必须写成一个文件,这样可以摆保证顺序
spark.createDataFrame(rdd.filter(lambda x: x[4] == 0).map(lambda x: (x[0],x[5],x[6],x[7]))) \
.toDF("level2_ids","y","z","ids").coalesce(1).write.format("tfrecords") \
.save(path=path+"native/", mode="overwrite")
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[4] == 1).map(lambda x:(x[1],x[2],x[3]))) \
.toDF("city", "uid", "cid_id")
print("nearby")
nearby_pre.toPandas().to_csv(local_path+"nearby.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[4] == 1).map(lambda x: (x[0], x[5], x[6], x[7]))) \
.toDF("level2_ids","y","z","ids").coalesce(1).write.format("tfrecords") \
.save(path=path+"nearby/", mode="overwrite")
rdd.unpersist() df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"])
df = df.na.fill(dict(zip(features, features)))
rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\
.rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map.get(x[0], 1), value_map.get(x[13], 2), value_map.get(x[14], 3), value_map.get(x[15], 4),
value_map.get(x[16], 5), value_map.get(x[17], 6), value_map.get(x[18], 7), value_map.get(x[19], 8),
value_map.get(x[20], 9), value_map.get(x[21], 10),
value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13),
value_map.get(x[25], 14), value_map.get(x[26], 15)],
app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30]
))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
def con_sql(db,sql): # TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
db.close()
return df
def get_filename(dir_in): train = rdd.map(
pre_add = "hdfs://172.16.32.4:8020/strategy/esmm/" lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x = [] x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
for i in range(0,200): f = time.time()
if i < 10: spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
t = pre_add+dir_in+"/part-r-0000"+str(i) "tag1_list", "tag2_list", "tag3_list", "tag4_list",
x.append(t) "tag5_list", "tag6_list", "tag7_list", "ids",
elif 10 <= i < 100: "search_tag2_list","search_tag3_list","city","cid_id","uid") \
t = pre_add + dir_in + "/part-r-000" + str(i) .repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
x.append(t) h = time.time()
elif 100 <= i < 200: print("train tfrecord done")
t = pre_add + dir_in + "/part-r-00" + str(i) print((h - f) / 60)
x.append(t)
return x
def get_hdfs(dir_in):
pre_path = "hdfs://172.16.32.4:8020"
args = "hdfs dfs -ls " + dir_in + " | awk '{print $8}'"
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
s_output, s_err = proc.communicate()
all_dart_dirs = s_output.split()
a = []
for i in all_dart_dirs:
b = str(i).split("/")[4]
if b[:4] == "part":
tmp = pre_path + str(i)[2:-1]
a.append(tmp)
return a
print("训练集样本总量:")
print(rdd.count())
def get_pre_number(): get_pre_number()
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select count(*) from esmm_pre_data"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchone()[0]
print("预测集数量:")
print(result)
db.close()
test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
if __name__ == '__main__': spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
# get_pre() "tag1_list", "tag2_list", "tag3_list", "tag4_list",
# sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ "tag5_list", "tag6_list", "tag7_list", "ids",
# .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ "search_tag2_list","search_tag3_list","city","cid_id","uid") \
# .set("spark.tispark.plan.allow_index_double_read", "false") \ .repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
# .set("spark.tispark.plan.allow_index_read", "true") \
# .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
# .set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")\
# .set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec","snappy")
#
# spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
# # ti.tidbMapDatabase("eagle")
# spark.sparkContext.setLogLevel("WARN")
# path = "hdfs:///strategy/esmm/"
# local_path = "/home/gmuser/esmm/"
#
# validate_date, value_map, app_list_map = feature()
# get_predict(validate_date, value_map, app_list_map)
#
#
# spark = SparkSession.builder.getOrCreate()
#
# b = [("a", 1), ("a", 1), ("b", 3), ("a", 2)]
# rdd = spark.sparkContext.parallelize(b)
# df = spark.createDataFrame(rdd).toDF("id", "n")
# df.show()
# df.createOrReplaceTempView("df")
# t = spark.sql("select id from df").map()
print("va tfrecord done")
rdd.unpersist()
return validate_date, value_map, app_list_map, leve2_map, leve3_map
def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from jerry_test.esmm_pre_data e " \
"left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
"left join jerry_test.question_tag question on e.device_id = question.device_id " \
"left join jerry_test.search_tag search on e.device_id = search.device_id " \
"left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date"
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
"app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"search_tag2", "search_tag3"]
df = spark.sql(sql)
df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"])
df = df.na.fill(dict(zip(features, features)))
f = time.time()
rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time", "search_tag2", "search_tag3") \
.rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map),
app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map),
app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map),
[value_map.get(date, 1), value_map.get(x[16], 2),
value_map.get(x[17], 3), value_map.get(x[18], 4),
value_map.get(x[19], 5), value_map.get(x[20], 6),
value_map.get(x[21], 7), value_map.get(x[22], 8),
value_map.get(x[23], 9), value_map.get(x[24], 10),
value_map.get(x[25], 11), value_map.get(x[26], 12),
value_map.get(x[27], 13), value_map.get(x[28], 14),
value_map.get(x[29], 15)], app_list_func(x[30], leve2_map),
app_list_func(x[31], leve3_map)))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
print(rdd.count())
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
x[12], x[13], x[14], x[15], x[16], x[17], x[18],x[3],x[4],x[5]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list","city","uid","cid_id") \
.repartition(1).write.format("tfrecords").save(path=path+"native/", mode="overwrite")
print("native tfrecord done")
h = time.time()
print((h-f)/60)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 1)
.map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
x[12], x[13], x[14], x[15], x[16], x[17], x[18],x[3],x[4],x[5]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list","city","uid","cid_id") \
.repartition(1).write.format("tfrecords").save(path=path + "nearby/", mode="overwrite")
print("nearby tfrecord done")
if __name__ == '__main__':
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")\
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec","snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
ti.tidbMapDatabase("eagle")
spark.sparkContext.setLogLevel("WARN")
path = "hdfs:///strategy/esmm/"
local_path = "/home/gmuser/esmm/"
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
spark.stop()
......
# -*- coding: utf-8 -*-
import pymysql
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
import datetime
import pandas as pd
import time
from pyspark import StorageLevel
def app_list_func(x,l):
b = str(x).split(",")
e = []
for i in b:
if i in l.keys():
e.append(l[i])
else:
e.append(0)
return e
def get_list(db,sql,n):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
v = list(set([i[0] for i in result]))
app_list_value = [str(i).split(",") for i in v]
app_list_unique = []
for i in app_list_value:
app_list_unique.extend(i)
app_list_unique = list(set(app_list_unique))
number = len(app_list_unique)
app_list_map = dict(zip(app_list_unique, list(range(n, number + n))))
db.close()
return number, app_list_map
def get_map():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select app_list from device_app_list"
a = time.time()
apps_number, app_list_map = get_list(db,sql,16)
print("applist")
print((time.time()-a)/60)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_ids from diary_feat"
b = time.time()
leve2_number, leve2_map = get_list(db, sql, 16+apps_number)
print("leve2")
print((time.time() - b) / 60)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level3_ids from diary_feat"
c = time.time()
leve3_number, leve3_map = get_list(db, sql, 16+leve2_number+apps_number)
print((time.time() - c) / 60)
return apps_number, app_list_map,leve2_number, leve2_map,leve3_number, leve3_map
def get_unique(db,sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
v = list(set([i[0] for i in result]))
db.close()
print(sql)
print(len(v))
return v
def con_sql(db,sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
db.close()
return df
def get_pre_number():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select count(*) from esmm_pre_data"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchone()[0]
print("预测集数量:")
print(result)
db.close()
def feature_engineer():
apps_number, app_list_map, level2_number, leve2_map, level3_number, leve3_map = get_map()
app_list_map["app_list"] = 16
leve3_map["level3_ids"] = 17
leve3_map["search_tag3"] = 18
leve2_map["level2_ids"] = 19
leve2_map["tag1"] = 20
leve2_map["tag2"] = 21
leve2_map["tag3"] = 22
leve2_map["tag4"] = 23
leve2_map["tag5"] = 24
leve2_map["tag6"] = 25
leve2_map["tag7"] = 26
leve2_map["search_tag2"] = 27
unique_values = []
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct stat_date from esmm_train_data_dwell"
unique_values.extend(get_unique(db,sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ucity_id from esmm_train_data_dwell"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ccity_name from esmm_train_data_dwell"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct time from cid_time_cut"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct device_type from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct manufacturer from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct channel from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct top from cid_type_top"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_min from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_method from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_max from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct maintain_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct recover_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data_dwell"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=3)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
sql = "select doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"where e.stat_date >= '{}'".format(start)
unique_values.extend(get_unique(db, sql))
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
"app_list","level3_ids", "level2_ids","tag1", "tag2", "tag3", "tag4","tag5", "tag6", "tag7",
"search_tag2","search_tag3"]
unique_values.extend(features)
print("unique_values length")
print(len(unique_values))
print("特征维度:")
print(apps_number + level2_number + level3_number + len(unique_values))
temp = list(range(28 + apps_number + level2_number + level3_number,
28 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp))
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
"left join jerry_test.question_tag question on e.device_id = question.device_id " \
"left join jerry_test.search_tag search on e.device_id = search.device_id " \
"left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
"where e.stat_date >= '{}'".format(start)
df = spark.sql(sql)
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7","search_tag2","search_tag3"])
df = df.na.fill(dict(zip(features, features)))
rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time","search_tag2","search_tag3")\
.rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map.get(x[0], 1), value_map.get(x[13], 2), value_map.get(x[14], 3), value_map.get(x[15], 4),
value_map.get(x[16], 5), value_map.get(x[17], 6), value_map.get(x[18], 7), value_map.get(x[19], 8),
value_map.get(x[20], 9), value_map.get(x[21], 10),
value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13),
value_map.get(x[25], 14), value_map.get(x[26], 15)],
app_list_func(x[27], leve2_map),app_list_func(x[28], leve3_map)
))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
train = rdd.map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13],x[14],x[15]))
f = time.time()
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids","search_tag2_list","search_tag3_list") \
.repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
h = time.time()
print("train tfrecord done")
print((h - f) / 60)
print("训练集样本总量:")
print(rdd.count())
get_pre_number()
test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13],x[14],x[15]))
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids","search_tag2_list","search_tag3_list") \
.repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
print("va tfrecord done")
rdd.unpersist()
return validate_date, value_map, app_list_map, leve2_map, leve3_map
def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"from jerry_test.esmm_pre_data e " \
"left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
"left join jerry_test.question_tag question on e.device_id = question.device_id " \
"left join jerry_test.search_tag search on e.device_id = search.device_id " \
"left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
"limit 60000"
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id",
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
"app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"search_tag2", "search_tag3"]
df = spark.sql(sql)
df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"])
df = df.na.fill(dict(zip(features, features)))
f = time.time()
rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time","search_tag2", "search_tag3") \
.rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map),
app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map),
app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map),
[value_map.get(date,1), value_map.get(x[16],2),
value_map.get(x[17],3), value_map.get(x[18], 4),
value_map.get(x[19], 5), value_map.get(x[20], 6),
value_map.get(x[21], 7), value_map.get(x[22], 8),
value_map.get(x[23], 9), value_map.get(x[24], 10),
value_map.get(x[25], 11), value_map.get(x[26], 12),
value_map.get(x[27], 13), value_map.get(x[28], 14),
value_map.get(x[29], 15)],app_list_func(x[30], leve2_map),
app_list_func(x[31], leve3_map)))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
# native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5],x[17])))\
# .toDF("city","uid","cid_id","number")
# print("native csv")
# native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],
x[12],x[13],x[14],x[15],x[16],x[17],x[18]))) \
.toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids","search_tag2","search_tag3")\
.repartition(1).write.format("tfrecords").save(path=path+"native/", mode="overwrite")
print("native tfrecord done")
h = time.time()
print((h-f)/60)
# nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[0] == 1).map(lambda x: (x[3], x[4], x[5],x[17]))) \
# .toDF("city", "uid", "cid_id","number")
# print("nearby csv")
# nearby_pre.toPandas().to_csv(local_path + "nearby.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 1)
.map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
x[12],x[13], x[14], x[15], x[16],x[17],x[18]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids","search_tag2", "search_tag3")\
.repartition(1).write.format("tfrecords").save(path=path + "nearby/", mode="overwrite")
print("nearby tfrecord done")
if __name__ == '__main__':
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")\
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec","snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
ti.tidbMapDatabase("eagle")
spark.sparkContext.setLogLevel("WARN")
path = "hdfs:///strategy/esmm/"
local_path = "/home/gmuser/esmm/"
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
spark.stop()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment