Commit 07dc32f1 authored by 王志伟's avatar 王志伟
parents 16dc7afe 7c856a21
......@@ -264,7 +264,7 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
if __name__ == "__main__":
path = "/home/gaoyazhe/data/"
path = "/home/gmuser/esmm_data/"
a = time.time()
df, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data()
model = transform(df, validate_date)
......
......@@ -11,7 +11,7 @@ my_user='gaoyazhe@igengmei.com'
def mail():
ret=True
try:
with open('/home/gaoyazhe/data/submit.log') as f:
with open('/home/gmuser/esmm_data/submit.log') as f:
stat_data = f.read()
msg=MIMEText(stat_data,'plain','utf-8')
msg['From']=formataddr(["高雅喆",my_sender])
......
......@@ -19,15 +19,16 @@ def con_sql(sql):
return result
def set_join(lst):
return ','.join([str(i) for i in list(lst)])
# return ','.join([str(i) for i in list(lst)])
return ','.join([str(i) for i in lst.unique().tolist()])
def main():
# native queue
df2 = pd.read_csv('/home/gaoyazhe/data/native.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2['cid_id'] = df2['cid_id'].astype('object')
df2 = pd.read_csv('/home/gmuser/esmm_data/native.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gaoyazhe/data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df1 = pd.read_csv("/home/gmuser/esmm_data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
......@@ -35,10 +36,10 @@ def main():
# nearby queue
df2 = pd.read_csv('/home/gaoyazhe/data/nearby.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2['cid_id'] = df2['cid_id'].astype('object')
df2 = pd.read_csv('/home/gmuser/esmm_data/nearby.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/home/gaoyazhe/data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df1 = pd.read_csv("/home/gmuser/esmm_data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
......@@ -46,6 +47,8 @@ def main():
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
df_all['device_id'] = df_all['device_id'].astype(str)
df_all['city_id'] = df_all['city_id'].astype(str)
ctime = int(time.time())
df_all["time"] = ctime
print("union_device_count",df_all.shape)
......@@ -62,7 +65,8 @@ def main():
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
df_merge = df_all['device_id'] + df_all['city_id']
df_merge_str = (str(list(df_merge.values))).strip('[]')
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
......
#! /bin/bash
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm
DATA_PATH=/home/gaoyazhe/data
DATA_PATH=/home/gmuser/esmm_data
echo "start time"
current=$(date "+%Y-%m-%d %H:%M:%S")
......@@ -14,15 +14,15 @@ rm ${DATA_PATH}/tr/*
rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/2018*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/201*
echo "data2ffm"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/data2ffm.py > ${DATA_PATH}/infer.log
all_sample=$((`cat ${DATA_PATH}/tr.csv | awk -F '\t' '{print$5}' | awk -F ',' '{print$2$3$4}' | sort | uniq | wc -l`))
uniq_feat=$((`cat ${DATA_PATH}/tr.csv | awk -F '\t' '{print$5}' | awk -F ',' '{print$4}' | sort | uniq -u | wc -l`))
repe_feat=$((all_sample-uniq_feat))
echo "Bayes Error Rate" : $((repe_feat*100/all_sample))%
uniq_feat=$((`cat ${DATA_PATH}/tr.csv | awk -F '\t' '{print$5}' | awk -F ',' '{print$4}' | sort | uniq -u | wc -l`))
repe_feat=$((all_sample-uniq_feat))
echo "Bayes Error Rate": $((repe_feat*100/all_sample))%
echo "split data"
split -l $((`wc -l < ${DATA_PATH}/tr.csv`/15)) ${DATA_PATH}/tr.csv -d -a 4 ${DATA_PATH}/tr/tr_ --additional-suffix=.csv
......@@ -53,7 +53,7 @@ currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "train time"
current=$(date "+%Y-%m-%d %H:%M:%S")
......@@ -62,11 +62,11 @@ currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=8 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/sort_and_2sql.py
......
......@@ -209,9 +209,20 @@ object EsmmData {
|and d.partition_date='${stat_date_not}'
""".stripMargin
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Append)
val union_data_scity_id2 = sc.sql(
s"""
|select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,first(diary_service_id) diary_service_id,first(y) y,
|first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id
|from union_data_scity_id
|group by device_id,cid_id
""".stripMargin
)
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append)
} else {
println("esmm_train_data already have param.date data")
......@@ -421,10 +432,10 @@ object EsmmPredData {
val union_data_scity_id2 = sc.sql(
s"""
|select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,first(label) label,first(diary_service_id)diary_service_id,first(y) y,
|select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,label,first(diary_service_id)diary_service_id,first(y) y,
|first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id
|from union_data_scity_id
|group by device_id,cid_id
|group by device_id,cid_id,label
""".stripMargin
)
......@@ -661,7 +672,7 @@ object GetLevelCount {
val stat_date = GmeiConfig.getMinusNDate(1).replace("-","")
// val diary_queue = sc.read.json(param.path).rdd.map(x => x(0).toString).distinct().collect().toList.mkString(",")
val diary_queue = "16283654,16211979,15331340,15534094,13602830,16228368,16211987,15990809,16234522,15386660,15843365,15759398,16306213,15597614,15298621,16134214,15302730,15652939,16193613,15269965,15302734,15466578,15386706,15491159,16101468,15515751,12777586,16304243,15521916,15978625,16435351,15650970,15712411,15544488,15294642,16277682,16425141,16203962,15202492,15386814,15474889,15636685,16101582,16251087,15300823,15300825,15345884,16257252,16214244,16234732,16056557,15247597,16199918,15378686,16267518,16240897,16195843,16220434,16257303,16232729,15491360,16199977,15391028,15491383,15628603,16089403,16357697,16339269,16298324,14969178,15573339,16193883,16419166,15360356,15573353,16132458,14229868,15475055,16234869,15827317,16413055,16298367,16425343,16193927,13986185,16361866,15475082,16245135,15292816,16425364,15544727,16116121,16085403,16136607,16253346,15419823,15481272,16202171,16431553,16419272,15385035,16269779,16417251,15954409,15890925,15731191,16261624,16157187,16130565,15206918,14492168,16294414,15729169,16419346,15479315,16054807,16175641,15239718,15299111,15309353,16173613,15231542,16269882,16251451,16353856,16228931,16300613,15346247,15874634,16308812,16134739,15577701,16208485,15420015,15628919,16061066,16140950,16122519,15751833,16298666,16282308,16141002,16239307,15841996,15565517,12747475,16134867,16122580,16083671,15485655,15196891,16134876,16202461,16202460,16354020,15903463,15082216,15842031,15299312,16397053,15430398,15506175,15387395,16177932,16272144,15891227,16098076,16255792,15594296,14971705,15649596,16118595,16294724,15741766,15287122,15387482,16108382,15477602,16354162,15764357,15883142,15887237,16450441,15477641,16049036,15371151,15276945,15416220,15471518,15360927,15246246,15748007,15578022,15195049,15860650,15489962,16163758,16214959,15332272,16049074,16055221,16296887,15881144,15256507,16200635,15272899,16272333,15338446,16376782,13278160,15385553,15967185,15338448,15467478,15299545,16397281,15461348,12284918,15901687,15361021,15266817,16114690,15625223,15256584,16194569,16194571,15950864,16204819,16049173,15531030,15397912,15883288,15604767,15729700,15504420,15987751,15572010,15615019,16403502,16225332,15891509,15778870,15903804,15545409,15569985,16297034,15572042,15572044,16198733,15545422,15946834,16116818,15336535,16116832,15336547,16266340,16323686,16116854,15621245,15414398,16297085,16116869,16221320,15887497,16225416,16112786,16225427,16123026,16430232,16204953,15895704,16434331,15545497,15912093,16299168,16059552,16204980,15299765,15420603,16399555,15639757,16084175,15361235,15633625,16116953,16065775,16233712,15856889,15375611,16194812,15594747,15609095,15594779,16262442,15420718,16035120,16137522,16405818,15420734,16233792,15570251,15967572,16266581,15639895,16084313,16293219,15592807,16371047,16422248,16246122,16153967,16131449,15349114,15746428,15746434,15297929,15527308,16145806,16317847,16061852,16246173,15912356,13163949,15429039,16041397,16197047,15803831,16207296,15443404,16121301,16127449,16213470,16115168,15629799,15336944,16338429,15629822,15750663,16129543,15568395,15564307,15646229,15441430,15369765,16354853,15441449,15576619,16301612,16199213,16215596,15644209,15994422,16258615,15482427,16096830,15595074,16299587,15414853,15418950,16268873,15988304,16084561,16305752,15603296,15328874,16399988,15877749,16354954,15949451,14542485,16219798,16107161,15345305,15990434,16400037,15720101,16035495,15859365,16375466,15214253,15769263,15328957,15976127,15769280,15519424,16238276,15576775,15253194,16197323,15261387,15591116,16197330,15390421,15306456,15388381,15515359,16258786,16258787,15519458,15990507,16258797,15519472,16166642,15904499,15199988,15990518,15748854,16422648,15533817,16140026,16004862,15986431,15296256,15910656,16193282,15714050,15931142,15834886,16049931,15232783,16426770,16115479,15519511,15519517,16228125,16424738,16297765,16162597,16142120,15980332,15458095,16244538,15580990,15988542,15398719,16269126,16119624,15458127,15966031,16420691,15880026,16185182,16406366,15880033,15880036,15521638,16088936,15533937,16213880,16111482,16199552,15513474,15961993,15986570,15970190,15644562,16138136,16424856,15490981,15402927,16406450,15511478,15747009,15632328,16068554,15966159,15271888,15302622,16191459,16222181,15890407,15966189,16275439,15237104,16424945,16300020,15300599,16050175"
val diary_queue = "16215222,16204965,15361235,16121397,16277565,15491159,16299587,16296887,15294642,16204934,15649199,16122580,16122580,16122580,16122580,16122580,16122580"
val diary_level1 = sc.sql(
s"""
|select diary_id,explode(split(level1_ids,';')) level1_id from diary_feat
......
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import pandas as pd
import pymysql
import datetime
from pyspark.sql import HiveContext
def get_data(day):
sc = SparkContext(conf=SparkConf().setAppName("multi_task")).getOrCreate()
sc.setLogLevel("WARN")
ctx = SQLContext(sc)
end_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
start_date = (datetime.date.today() - datetime.timedelta(days=day)).strftime("%Y-%m-%d")
dbtable = "(select device_id,cid_id,stat_date from data_feed_click " \
"where stat_date >= '{}' and stat_date <= '{}')tmp".format(start_date, end_date)
click = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="3SYz54LS9#^9sBvC").load()
click.show(6)
click = click.rdd.map(lambda x:(x[0],x[1],x[2]))
device_id = tuple(click.map(lambda x:x[0]).collect())
print(device_id[0:2])
dbtable = "(select device_id,cid_id,stat_date from data_feed_exposure " \
"where stat_date >= '{}' and stat_date <= '{}' and device_id in {})tmp".format(start_date,end_date,device_id)
exp = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="3SYz54LS9#^9sBvC").load()
exp.show(6)
exp = exp.rdd.map(lambda x:(x[0],x[1],x[2])).subtract(click).map(lambda x:((x[0],x[1],x[2]),1))\
.reduceByKey(lambda x,y:x+y).filter(lambda x:x[1] >= 3).map(lambda x:(x[0][0],x[0][1],x[0][2],0))
click = click.map(lambda x:(x[0],x[1],x[2],1))
date = click.map(lambda x:x[2]).collect()
def test():
sc = SparkContext(conf=SparkConf().setAppName("multi_task")).getOrCreate()
sc.setLogLevel("WARN")
ctx = SQLContext(sc)
end_date = "2018-09-10"
start_date = "2018-09-09"
dbtable = "(select device_id,cid_id,stat_date from data_feed_click " \
"limit 80)tmp".format(start_date)
click = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="").load()
click.show(6)
click = click.rdd.map(lambda x: (x[0], x[1], x[2]))
date = click.map(lambda x: x[2]).collect()
cid = click.map(lambda x: x[1]).collect()
click = click.map(lambda x:str(1)+" "+str(cid.index(x[1]))+":"+str(1)+" "+str(date.index(x[2]))+":"+str(1))
print(click.take(6))
# device_id = tuple(click.map(lambda x: x[0]).collect())
# print(device_id[0:2])
# dbtable = "(select device_id,cid_id,stat_date from data_feed_exposure " \
# "where stat_date = '{}' and device_id in {})tmp".format(start_date,device_id)
# exp = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000/jerry_prod",
# driver="com.mysql.jdbc.Driver",
# dbtable=dbtable,
# user="root",
# password="").load()
# exp.show(6)
# exp = exp.rdd.map(lambda x: (x[0], x[1], x[2])).subtract(click).map(lambda x: ((x[0], x[1], x[2]), 1)) \
# .reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] >= 3).map(lambda x: (x[0][0], x[0][1], x[0][2], 0))
# click = click.map(lambda x: (x[0], x[1], x[2], 1))
def hive():
conf = SparkConf().setMaster("spark://10.30.181.88:7077").setAppName("My app")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
sqlContext = HiveContext(sc)
sql = "select partition_date from online.tl_hdfs_maidian_view limit 10"
my_dataframe = sqlContext.sql(sql)
my_dataframe.show(6)
import tensorflow as tf
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time"})
print("esmm data ok")
print(df.head(2))
df = df.fillna("na")
print(df.count())
ucity_id = {v:i for i,v in df["ucity_id"].unique()}
clevel1_id = {v:i for i,v in df["clevel1_id"].unique()}
ccity_name = {v:i for i,v in df["ccity_name"].unique()}
device_type = {v:i for i,v in df["device_type"].unique()}
manufacturer = {v:i for i,v in df["manufacturer"].unique()}
channel = {v:i for i,v in df["channel"].unique()}
top = {v:i for i,v in df["top"].unique()}
time = {v:i for i,v in df["time"].unique()}
df["ucity_id"] = df["ucity_id"].map(ucity_id)
df["clevel1_id"] = df["clevel1_id"].map(clevel1_id)
df["ccity_name"] = df["ccity_name"].map(ccity_name)
df["device_type"] = df["device_type"].map(device_type)
df["manufacturer"] = df["manufacturer"].map(manufacturer)
df["channel"] = df["channel"].map(channel)
df["top"] = df["top"].map(top)
df["time"] = df["time"].map(time)
train = df.loc[df["stat_date"] == validate_date]
test = df.loc[df["stat_date"] != validate_date]
features = ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel","top","time"]
train_values = train[features].values
train_labels = train[["y","z"]].values
test_values = test[features].values
test_labels = test[["y","z"]].values
ucity_id_max = len(ucity_id)
clevel1_id_max = len(clevel1_id)
ccity_name_max = len(ccity_name)
device_type_max = len(device_type)
manufacturer_max = len(manufacturer)
channel_max = len(channel)
top_max = len(top)
time_max = len(time)
return train_values,train_labels,test_values,test_labels,ucity_id_max,clevel1_id_max,ccity_name_max,\
device_type_max,manufacturer_max,channel_max,top_max,time_max
def get_inputs():
ucity_id = tf.placeholder(tf.int32, [None, 1], name="ucity_id")
clevel1_id = tf.placeholder(tf.int32, [None, 1], name="clevel1_id")
ccity_name = tf.placeholder(tf.int32, [None, 1], name="ccity_name")
device_type = tf.placeholder(tf.int32, [None, 1], name="device_type")
manufacturer = tf.placeholder(tf.int32, [None, 1], name="manufacturer")
channel = tf.placeholder(tf.int32, [None, 1], name="channel")
top = tf.placeholder(tf.int32, [None, 1], name="top")
time = tf.placeholder(tf.int32, [None, 1], name="time")
targets = tf.placeholder(tf.float32, [None, 2], name="targets")
LearningRate = tf.placeholder(tf.float32, name="LearningRate")
return ucity_id,clevel1_id,ccity_name,device_type,manufacturer,channel,top,time,targets,LearningRate
def define_embedding_layers(combiner,embed_dim,ucity_id, ucity_id_max, clevel1_id_max,clevel1_id,
ccity_name_max,ccity_name,device_type_max,device_type,manufacturer_max,
manufacturer,channel,channel_max,top,top_max,time,time_max):
ucity_id_embed_matrix = tf.Variable(tf.random_normal([ucity_id_max, embed_dim], 0, 0.001))
ucity_id_embed_layer = tf.nn.embedding_lookup(ucity_id_embed_matrix, ucity_id)
if combiner == "sum":
ucity_id_embed_layer = tf.reduce_sum(ucity_id_embed_layer, axis=1, keep_dims=True)
clevel1_id_embed_matrix = tf.Variable(tf.random_uniform([clevel1_id_max, embed_dim], 0, 0.001))
clevel1_id_embed_layer = tf.nn.embedding_lookup(clevel1_id_embed_matrix, clevel1_id)
if combiner == "sum":
clevel1_id_embed_layer = tf.reduce_sum(clevel1_id_embed_layer, axis=1, keep_dims=True)
ccity_name_embed_matrix = tf.Variable(tf.random_uniform([ccity_name_max, embed_dim], 0, 0.001))
ccity_name_embed_layer = tf.nn.embedding_lookup(ccity_name_embed_matrix,ccity_name)
if combiner == "sum":
ccity_name_embed_layer = tf.reduce_sum(ccity_name_embed_layer, axis=1, keep_dims=True)
device_type_embed_matrix = tf.Variable(tf.random_uniform([device_type_max, embed_dim], 0, 0.001))
device_type_embed_layer = tf.nn.embedding_lookup(device_type_embed_matrix, device_type)
if combiner == "sum":
device_type_embed_layer = tf.reduce_sum(device_type_embed_layer, axis=1, keep_dims=True)
manufacturer_embed_matrix = tf.Variable(tf.random_uniform([manufacturer_max, embed_dim], 0, 0.001))
manufacturer_embed_layer = tf.nn.embedding_lookup(manufacturer_embed_matrix, manufacturer)
if combiner == "sum":
manufacturer_embed_layer = tf.reduce_sum(manufacturer_embed_layer, axis=1, keep_dims=True)
channel_embed_matrix = tf.Variable(tf.random_uniform([channel_max, embed_dim], 0, 0.001))
channel_embed_layer = tf.nn.embedding_lookup(channel_embed_matrix, channel)
if combiner == "sum":
channel_embed_layer = tf.reduce_sum(channel_embed_layer, axis=1, keep_dims=True)
top_embed_matrix = tf.Variable(tf.random_uniform([top_max, embed_dim], 0, 0.001))
top_embed_layer = tf.nn.embedding_lookup(top_embed_matrix, top)
if combiner == "sum":
top_embed_layer = tf.reduce_sum(top_embed_layer, axis=1, keep_dims=True)
time_embed_matrix = tf.Variable(tf.random_uniform([time_max, embed_dim], 0, 0.001))
time_embed_layer = tf.nn.embedding_lookup(time_embed_matrix, time)
if combiner == "sum":
time_embed_layer = tf.reduce_sum(time_embed_layer, axis=1, keep_dims=True)
esmm_embedding_layer = tf.concat([ucity_id_embed_layer, clevel1_id_embed_layer,ccity_name_embed_layer,
device_type_embed_layer,manufacturer_embed_layer,channel_embed_layer,
top_embed_layer,time_embed_layer], axis=1)
esmm_embedding_layer = tf.reshape(esmm_embedding_layer, [-1, embed_dim * 8])
return esmm_embedding_layer
def define_ctr_layer(esmm_embedding_layer):
ctr_layer_1 = tf.layers.dense(esmm_embedding_layer, 200, activation=tf.nn.relu)
ctr_layer_2 = tf.layers.dense(ctr_layer_1, 80, activation=tf.nn.relu)
ctr_layer_3 = tf.layers.dense(ctr_layer_2, 2) # [nonclick, click]
ctr_prob = tf.nn.softmax(ctr_layer_3) + 0.00000001
return ctr_prob
def define_cvr_layer(esmm_embedding_layer):
cvr_layer_1 = tf.layers.dense(esmm_embedding_layer, 200, activation=tf.nn.relu)
cvr_layer_2 = tf.layers.dense(cvr_layer_1, 80, activation=tf.nn.relu)
cvr_layer_3 = tf.layers.dense(cvr_layer_2, 2) # [nonbuy, buy]
cvr_prob = tf.nn.softmax(cvr_layer_3) + 0.00000001
return cvr_prob
def define_ctr_cvr_layer(esmm_embedding_layer):
layer_1 = tf.layers.dense(esmm_embedding_layer, 128 , activation=tf.nn.relu)
layer_2 = tf.layers.dense(layer_1, 16, activation=tf.nn.relu)
layer_3 = tf.layers.dense(layer_2, 2)
ctr_prob = tf.nn.softmax(layer_3) + 0.00000001
cvr_prob = tf.nn.softmax(layer_3) + 0.00000001
return ctr_prob, cvr_prob
if __name__ == '__main__':
hive()
embed_dim = 6
combiner = "sum"
train_values, train_labels, test_values, test_labels, ucity_id_max, clevel1_id_max, ccity_name_max, \
device_type_max, manufacturer_max, channel_max, top_max, time_max = get_data()
tf.reset_default_graph()
train_graph = tf.Graph()
with train_graph.as_default():
ucity_id, clevel1_id, ccity_name, device_type, manufacturer, channel, top, \
time, targets, LearningRate = get_inputs()
esmm_embedding_layer = define_embedding_layers(combiner,embed_dim,ucity_id, ucity_id_max, clevel1_id_max,clevel1_id,
ccity_name_max,ccity_name,device_type_max,device_type,manufacturer_max,
manufacturer,channel,channel_max,top,top_max,time,time_max)
ctr_prob, cvr_prob = define_ctr_cvr_layer(esmm_embedding_layer)
with tf.name_scope("loss"):
ctr_prob_one = tf.slice(ctr_prob, [0, 1], [-1, 1]) # [batch_size , 1]
cvr_prob_one = tf.slice(cvr_prob, [0, 1], [-1, 1]) # [batchsize, 1 ]
ctcvr_prob_one = ctr_prob_one * cvr_prob_one # [ctr*cvr]
ctcvr_prob = tf.concat([1 - ctcvr_prob_one, ctcvr_prob_one], axis=1)
ctr_label = tf.slice(targets, [0, 0], [-1, 1]) # target: [click, buy]
ctr_label = tf.concat([1 - ctr_label, ctr_label], axis=1) # [1-click, click]
cvr_label = tf.slice(targets, [0, 1], [-1, 1])
ctcvr_label = tf.concat([1 - cvr_label, cvr_label], axis=1)
# 单列,判断Click是否=1
ctr_clk = tf.slice(targets, [0, 0], [-1, 1])
ctr_clk_dup = tf.concat([ctr_clk, ctr_clk], axis=1)
# clicked subset CVR loss
cvr_loss = - tf.multiply(tf.log(cvr_prob) * ctcvr_label, ctr_clk_dup)
# batch CTR loss
ctr_loss = - tf.log(ctr_prob) * ctr_label # -y*log(p)-(1-y)*log(1-p)
# batch CTCVR loss
ctcvr_loss = - tf.log(ctcvr_prob) * ctcvr_label
# loss = tf.reduce_mean(ctr_loss + ctcvr_loss + cvr_loss)
# loss = tf.reduce_mean(ctr_loss + ctcvr_loss)
# loss = tf.reduce_mean(ctr_loss + cvr_loss)
loss = tf.reduce_mean(cvr_loss)
ctr_loss = tf.reduce_mean(ctr_loss)
cvr_loss = tf.reduce_mean(cvr_loss)
ctcvr_loss = tf.reduce_mean(ctcvr_loss)
# 优化损失
# train_op = tf.train.AdamOptimizer(lr).minimize(loss) #cost
global_step = tf.Variable(0, name="global_step", trainable=False)
optimizer = tf.train.AdamOptimizer(lr)
gradients = optimizer.compute_gradients(loss) # cost
train_op = optimizer.apply_gradients(gradients, global_step=global_step)
......@@ -156,14 +156,15 @@ def get_data():
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
print("esmm data ok")
print(df.head(2))
# print(df.head(2)
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date","device_id"], axis=1).fillna(0.0)
df = df.drop(["z","stat_date","device_id","time"], axis=1).fillna("na")
print(df.head(2))
features = 0
for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
......@@ -199,8 +200,9 @@ def transform(a,validate_date):
test = test.drop("stat_date",axis=1)
print("train shape")
print(train.shape)
train.to_csv(path + "tr.csv", sep="\t", index=False)
test.to_csv(path + "va.csv", sep="\t", index=False)
# train.to_csv(path + "tr.csv", sep="\t", index=False)
# test.to_csv(path + "va.csv", sep="\t", index=False)
return model
......@@ -210,7 +212,8 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id"
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.device_id = '358035085192742'"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
......@@ -244,7 +247,7 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
df["y"] = df["label"].str.cat(
[df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
df = df.drop(["z","label","device_id","cid_id"], axis=1).fillna(0.0)
df = df.drop(["z","label","device_id","cid_id","time"], axis=1).fillna(0.0)
print("before transform")
print(df.shape)
temp_series = model.transform(df,n=160000, processes=22)
......
......@@ -65,10 +65,16 @@ def click():
two_map[i] = df.loc[df[2] == i].shape[0] / n
print(sorted(two_map.items(), key=lambda x: x[1],reverse=True))
def get_cid():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct cid_id from esmm_train_data where device_id = '358035085192742' " \
"and stat_date >= '2018-12-03'"
df = con_sql(db, sql)[0].values.tolist()
print(",".join(df))
if __name__ == "__main__":
click()
get_cid()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment