Commit a0c75290 authored by 张彦钊's avatar 张彦钊

Merge branch 'zhao' into 'master'

Zhao

See merge request !17
parents 3049b5b5 d0ed7c24
......@@ -36,19 +36,29 @@ def get_data():
print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,e.device_id,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids,feat.level2 " \
"u.channel,c.top,e.device_id,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids,feat.level2," \
"wiki.tag,question.tag,search.tag,budan.tag,order_tag.tag,sixin.tag,cart.tag " \
"from {} e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id " \
"left join wiki_tag wiki on e.device_id = wiki.device_id " \
"left join question_tag question on e.device_id = question.device_id " \
"left join search_tag search on e.device_id = search.device_id " \
"left join budan_tag budan on e.device_id = budan.device_id " \
"left join order_tag on e.device_id = order_tag.device_id " \
"left join sixin_tag sixin on e.device_id = sixin.device_id " \
"left join cart_tag cart on e.device_id = cart.device_id " \
"where e.stat_date >= '{}'".format(train_data_set, start)
# 上面order_tag 表不要简称为order,因为order是mysql的保留字,例如order by
df = con_sql(db, sql)
# print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id",
11: "time", 12: "app_list", 13: "service_id", 14: "level3_ids", 15: "level2"})
11: "time", 12: "app_list", 13: "service_id", 14: "level3_ids", 15: "level2",
16:"tag1",17:"tag2",18:"tag3",19:"tag4",20:"tag5",21:"tag6",22:"tag7"})
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \
......@@ -89,6 +99,10 @@ def get_data():
level2_number, level2_map = multi_hot(df, "clevel2_id", 2 + app_list_number)
level3_number, level3_map = multi_hot(df, "level3_ids", 2 + app_list_number + level2_number)
for i in ["tag1","tag2","tag3","tag4","tag5","tag6","tag7"]:
df[i] = df[i].fillna("lost_na")
df[i] = df[i].apply(app_list_func, args=(level2_map,))
unique_values = []
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
......@@ -105,8 +119,9 @@ def get_data():
value_map = dict(zip(unique_values, temp))
df = df.drop("device_id", axis=1)
# 最近一天的数据集放进训练集,这样用户的正、负反馈能及时获取
# TODO 上线后把最近一天的数据集放进训练集,这样用户的正、负反馈能及时获取
train = df
test = df[df["stat_date"] == validate_date + "stat_date"]
for i in ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "hospital_id",
......@@ -151,17 +166,25 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids,feat.level2 " \
"dl.app_list,e.hospital_id,feat.level3_ids,feat.level2," \
"wiki.tag,question.tag,search.tag,budan.tag,order_tag.tag,sixin.tag,cart.tag " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id"
"left join diary_feat feat on e.cid_id = feat.diary_id " \
"left join wiki_tag wiki on e.device_id = wiki.device_id " \
"left join question_tag question on e.device_id = question.device_id " \
"left join search_tag search on e.device_id = search.device_id " \
"left join budan_tag budan on e.device_id = budan.device_id " \
"left join order_tag on e.device_id = order_tag.device_id " \
"left join sixin_tag sixin on e.device_id = sixin.device_id " \
"left join cart_tag cart on e.device_id = cart.device_id"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id",
11: "cid_id", 12: "time", 13: "app_list", 14: "hospital_id", 15: "level3_ids",
16: "level2"})
16: "level2",17:"tag1",18:"tag2",19:"tag3",20:"tag4",21:"tag5",22:"tag6",23:"tag7"})
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \
......@@ -185,6 +208,10 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
df["level3_ids"] = df["level3_ids"].fillna("lost_na")
df["level3_ids"] = df["level3_ids"].apply(app_list_func, args=(level3_map,))
for i in ["tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"]:
df[i] = df[i].fillna("lost_na")
df[i] = df[i].apply(app_list_func, args=(level2_map,))
# print("predict shape")
# print(df.shape)
df["uid"] = df["device_id"]
......
......@@ -36,13 +36,28 @@ def gen_tfrecords(in_file):
app_list = np.array(str(df["app_list"][i]).split(","))
level2_list = np.array(str(df["clevel2_id"][i]).split(","))
level3_list = np.array(str(df["level3_ids"][i]).split(","))
tag1_list = np.array(str(df["tag1"][i]).split(","))
tag2_list = np.array(str(df["tag2"][i]).split(","))
tag3_list = np.array(str(df["tag3"][i]).split(","))
tag4_list = np.array(str(df["tag4"][i]).split(","))
tag5_list = np.array(str(df["tag5"][i]).split(","))
tag6_list = np.array(str(df["tag6"][i]).split(","))
tag7_list = np.array(str(df["tag7"][i]).split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list": tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int))),
"level2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level2_list.astype(np.int))),
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int)))
"level3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=level3_list.astype(np.int))),
"tag1_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag1_list.astype(np.int))),
"tag2_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag2_list.astype(np.int))),
"tag3_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag3_list.astype(np.int))),
"tag4_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag4_list.astype(np.int))),
"tag5_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag5_list.astype(np.int))),
"tag6_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag6_list.astype(np.int))),
"tag7_list": tf.train.Feature(int64_list=tf.train.Int64List(value=tag7_list.astype(np.int)))
})
example = tf.train.Example(features = features)
......
......@@ -54,7 +54,14 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
"ids": tf.FixedLenFeature([FLAGS.field_size], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64),
"level3_list": tf.VarLenFeature(tf.int64)
"level3_list": tf.VarLenFeature(tf.int64),
"tag1_list": tf.VarLenFeature(tf.int64),
"tag2_list": tf.VarLenFeature(tf.int64),
"tag3_list": tf.VarLenFeature(tf.int64),
"tag4_list": tf.VarLenFeature(tf.int64),
"tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
......@@ -103,6 +110,13 @@ def model_fn(features, labels, mode, params):
app_list = features['app_list']
level2_list = features['level2_list']
level3_list = features['level3_list']
tag1_list = features['tag1_list']
tag2_list = features['tag2_list']
tag3_list = features['tag3_list']
tag4_list = features['tag4_list']
tag5_list = features['tag5_list']
tag6_list = features['tag6_list']
tag7_list = features['tag7_list']
if FLAGS.task_type != "infer":
y = labels['y']
......@@ -114,10 +128,17 @@ def model_fn(features, labels, mode, params):
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
level2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level2_list, sp_weights=None, combiner="sum")
level3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level3_list, sp_weights=None, combiner="sum")
tag1 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag1_list, sp_weights=None, combiner="sum")
tag2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag2_list, sp_weights=None, combiner="sum")
tag3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag3_list, sp_weights=None, combiner="sum")
tag4 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag4_list, sp_weights=None, combiner="sum")
tag5 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag5_list, sp_weights=None, combiner="sum")
tag6 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag6_list, sp_weights=None, combiner="sum")
tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum")
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id,shape=[-1,common_dims]),app_id,level2,level3], axis=1)
x_concat = tf.concat([tf.reshape(embedding_id,shape=[-1,common_dims]),app_id,level2,level3,tag1,
tag2,tag3,tag4,tag5,tag6,tag7], axis=1)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
......
......@@ -32,12 +32,16 @@ def multi_hot(df,column,n):
def feature_engineer():
# TODO 删除下面的测试写入
df = spark.sql("select y,z from esmm_train_data limit 60")
df.write.format("avro").save(path=path + "tr", mode="overwrite")
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=3)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=2)).strftime("%Y-%m-%d")
print(start)
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment