Commit 8ca8aa06 authored by 张彦钊's avatar 张彦钊

增加应用列表特征

parent bee2d7ff
...@@ -2,11 +2,6 @@ ...@@ -2,11 +2,6 @@
import pandas as pd import pandas as pd
import pymysql import pymysql
import datetime import datetime
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
def con_sql(db,sql): def con_sql(db,sql):
...@@ -29,21 +24,22 @@ def get_data(): ...@@ -29,21 +24,22 @@ def get_data():
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=50)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=20)).strftime("%Y-%m-%d")
print(start) print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,cut.time " \ "u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,cut.time,dl.app_list " \
"from {} e left join user_feature u on e.device_id = u.device_id " \ "from {} e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \ "left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \ "left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \ "left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"where e.stat_date >= '{}'".format(train_data_set,start) "where e.stat_date >= '{}'".format(train_data_set,start)
df = con_sql(db, sql) df = con_sql(db, sql)
# print(df.shape) # print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11: "l2", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11: "l2",
12: "device_id", 13: "time"}) 12: "device_id", 13: "time",14:"app_list"})
print("esmm data ok") print("esmm data ok")
# print(df.head(2) # print(df.head(2)
print("before") print("before")
...@@ -51,7 +47,19 @@ def get_data(): ...@@ -51,7 +47,19 @@ def get_data():
print("after") print("after")
df = df.drop_duplicates() df = df.drop_duplicates()
df = df.drop_duplicates(["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer", df = df.drop_duplicates(["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1","l2", "time", "stat_date"]) "channel", "top", "l1","l2", "time", "stat_date","app_list"])
df["app_list"] = df["app_list"].fillna("lost_na")
app_list_value = [i.split(",") for i in df["app_list"].unique()]
app_list_unique = []
for i in app_list_value:
app_list_unique.extend(i)
app_list_unique = list(set(app_list_unique))
app_list_map = dict(zip(app_list_unique, list(range(1, len(app_list_unique) + 1))))
df["app_list"] = df["app_list"].apply(app_list_func,args=(app_list_map,))
print("after applist map")
print(df.head(2))
# print(df.shape) # print(df.shape)
# print("exp numbers:") # print("exp numbers:")
# print(df[df["y"] == 0].shape) # print(df[df["y"] == 0].shape)
...@@ -80,7 +88,7 @@ def get_data(): ...@@ -80,7 +88,7 @@ def get_data():
print(len(unique_values)) print(len(unique_values))
print(df.head(2)) print(df.head(2))
temp = list(range(1,len(unique_values)+1)) temp = list(range(len(app_list_unique) + 1,len(app_list_unique) + len(unique_values)+1))
value_map = dict(zip(unique_values,temp)) value_map = dict(zip(unique_values,temp))
df = df.drop("device_id", axis=1) df = df.drop("device_id", axis=1)
...@@ -102,6 +110,14 @@ def get_data(): ...@@ -102,6 +110,14 @@ def get_data():
return validate_date,value_map return validate_date,value_map
def app_list_func(x,l):
b = x.split(",")
e = []
for i in b:
e.append(l[i])
return ",".join([str(j) for j in e])
def write_csv(df,name,n): def write_csv(df,name,n):
for i in range(0, df.shape[0], n): for i in range(0, df.shape[0], n):
if i == 0: if i == 0:
...@@ -112,22 +128,6 @@ def write_csv(df,name,n): ...@@ -112,22 +128,6 @@ def write_csv(df,name,n):
temp = df.iloc[i:i + n] temp = df.iloc[i:i + n]
temp.to_csv(path + name+ "/{}_{}.csv".format(name,i), index=False) temp.to_csv(path + name+ "/{}_{}.csv".format(name,i), index=False)
def esmm_pre():
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(yesterday)
conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
spark = SparkSession.builder.getOrCreate(conf)
spark.sql("""
select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}'
""".format(yesterday)).show(6)
def get_predict(date,value_map): def get_predict(date,value_map):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
......
...@@ -32,15 +32,15 @@ rm ${DATA_PATH}/nearby/nearby_* ...@@ -32,15 +32,15 @@ rm ${DATA_PATH}/nearby/nearby_*
echo "train..." echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=2 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=12 --feature_size=20000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "infer native..." echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=12 --feature_size=20000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
echo "infer nearby..." echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log ${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=12 --feature_size=20000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
echo "sort and 2sql" echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py ${PYTHON_PATH} ${MODEL_PATH}/to_database.py
...@@ -33,10 +33,12 @@ def gen_tfrecords(in_file): ...@@ -33,10 +33,12 @@ def gen_tfrecords(in_file):
id = np.array([]) id = np.array([])
for j in feats: for j in feats:
id = np.append(id,df[j][i]) id = np.append(id,df[j][i])
app_list = np.array(df["app_list"][i].split(","))
features = tf.train.Features(feature={ features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])), "y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])), "z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))) "ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list":tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int)))
}) })
example = tf.train.Example(features = features) example = tf.train.Example(features = features)
......
...@@ -53,7 +53,8 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False): ...@@ -53,7 +53,8 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
features = { features = {
"y": tf.FixedLenFeature([], tf.float32), "y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32), "z": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([11], tf.int64) "ids": tf.FixedLenFeature([11], tf.int64),
"app_list": tf.VarLenFeature(tf.int64)
} }
parsed = tf.parse_single_example(record, features) parsed = tf.parse_single_example(record, features)
...@@ -99,6 +100,7 @@ def model_fn(features, labels, mode, params): ...@@ -99,6 +100,7 @@ def model_fn(features, labels, mode, params):
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer()) Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
feat_ids = features['ids'] feat_ids = features['ids']
app_list = features['app_list']
if FLAGS.task_type != "infer": if FLAGS.task_type != "infer":
y = labels['y'] y = labels['y']
...@@ -107,8 +109,10 @@ def model_fn(features, labels, mode, params): ...@@ -107,8 +109,10 @@ def model_fn(features, labels, mode, params):
#------build f(x)------ #------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"): with tf.variable_scope("Shared-Embedding-layer"):
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids) embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K) # x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]),app_id], axis=1)
with tf.name_scope("CVR_Task"): with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN: if mode == tf.estimator.ModeKeys.TRAIN:
......
...@@ -22,7 +22,6 @@ def test(): ...@@ -22,7 +22,6 @@ def test():
def esmm_pre(): def esmm_pre():
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(yesterday) print(yesterday)
addresses = "10.66.157.22:2379"
spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# gw = SparkContext._gateway # gw = SparkContext._gateway
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment