Commit c95cd883 authored by 张彦钊's avatar 张彦钊

add liuxao

parent 72bb373c
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import pandas as pd
import pymysql
import datetime
from pyspark.sql import HiveContext
def get_data(day):
sc = SparkContext(conf=SparkConf().setAppName("multi_task")).getOrCreate()
sc.setLogLevel("WARN")
ctx = SQLContext(sc)
end_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
start_date = (datetime.date.today() - datetime.timedelta(days=day)).strftime("%Y-%m-%d")
dbtable = "(select device_id,cid_id,stat_date from data_feed_click " \
"where stat_date >= '{}' and stat_date <= '{}')tmp".format(start_date, end_date)
click = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="3SYz54LS9#^9sBvC").load()
click.show(6)
click = click.rdd.map(lambda x:(x[0],x[1],x[2]))
device_id = tuple(click.map(lambda x:x[0]).collect())
print(device_id[0:2])
dbtable = "(select device_id,cid_id,stat_date from data_feed_exposure " \
"where stat_date >= '{}' and stat_date <= '{}' and device_id in {})tmp".format(start_date,end_date,device_id)
exp = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="3SYz54LS9#^9sBvC").load()
exp.show(6)
exp = exp.rdd.map(lambda x:(x[0],x[1],x[2])).subtract(click).map(lambda x:((x[0],x[1],x[2]),1))\
.reduceByKey(lambda x,y:x+y).filter(lambda x:x[1] >= 3).map(lambda x:(x[0][0],x[0][1],x[0][2],0))
click = click.map(lambda x:(x[0],x[1],x[2],1))
date = click.map(lambda x:x[2]).collect()
def test():
sc = SparkContext(conf=SparkConf().setAppName("multi_task")).getOrCreate()
sc.setLogLevel("WARN")
ctx = SQLContext(sc)
end_date = "2018-09-10"
start_date = "2018-09-09"
dbtable = "(select device_id,cid_id,stat_date from data_feed_click " \
"limit 80)tmp".format(start_date)
click = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="").load()
click.show(6)
click = click.rdd.map(lambda x: (x[0], x[1], x[2]))
date = click.map(lambda x: x[2]).collect()
cid = click.map(lambda x: x[1]).collect()
click = click.map(lambda x:str(1)+" "+str(cid.index(x[1]))+":"+str(1)+" "+str(date.index(x[2]))+":"+str(1))
print(click.take(6))
# device_id = tuple(click.map(lambda x: x[0]).collect())
# print(device_id[0:2])
# dbtable = "(select device_id,cid_id,stat_date from data_feed_exposure " \
# "where stat_date = '{}' and device_id in {})tmp".format(start_date,device_id)
# exp = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000/jerry_prod",
# driver="com.mysql.jdbc.Driver",
# dbtable=dbtable,
# user="root",
# password="").load()
# exp.show(6)
# exp = exp.rdd.map(lambda x: (x[0], x[1], x[2])).subtract(click).map(lambda x: ((x[0], x[1], x[2]), 1)) \
# .reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] >= 3).map(lambda x: (x[0][0], x[0][1], x[0][2], 0))
# click = click.map(lambda x: (x[0], x[1], x[2], 1))
def hive():
conf = SparkConf().setMaster("spark://10.30.181.88:7077").setAppName("My app")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
sqlContext = HiveContext(sc)
sql = "select partition_date from online.tl_hdfs_maidian_view limit 10"
my_dataframe = sqlContext.sql(sql)
my_dataframe.show(6)
import tensorflow as tf
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time"})
print("esmm data ok")
print(df.head(2))
df = df.fillna("na")
print(df.count())
ucity_id = {v:i for i,v in df["ucity_id"].unique()}
clevel1_id = {v:i for i,v in df["clevel1_id"].unique()}
ccity_name = {v:i for i,v in df["ccity_name"].unique()}
device_type = {v:i for i,v in df["device_type"].unique()}
manufacturer = {v:i for i,v in df["manufacturer"].unique()}
channel = {v:i for i,v in df["channel"].unique()}
top = {v:i for i,v in df["top"].unique()}
time = {v:i for i,v in df["time"].unique()}
df["ucity_id"] = df["ucity_id"].map(ucity_id)
df["clevel1_id"] = df["clevel1_id"].map(clevel1_id)
df["ccity_name"] = df["ccity_name"].map(ccity_name)
df["device_type"] = df["device_type"].map(device_type)
df["manufacturer"] = df["manufacturer"].map(manufacturer)
df["channel"] = df["channel"].map(channel)
df["top"] = df["top"].map(top)
df["time"] = df["time"].map(time)
train = df.loc[df["stat_date"] == validate_date]
test = df.loc[df["stat_date"] != validate_date]
features = ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel","top","time"]
train_values = train[features].values
train_labels = train[["y","z"]].values
test_values = test[features].values
test_labels = test[["y","z"]].values
ucity_id_max = len(ucity_id)
clevel1_id_max = len(clevel1_id)
ccity_name_max = len(ccity_name)
device_type_max = len(device_type)
manufacturer_max = len(manufacturer)
channel_max = len(channel)
top_max = len(top)
time_max = len(time)
return train_values,train_labels,test_values,test_labels,ucity_id_max,clevel1_id_max,ccity_name_max,\
device_type_max,manufacturer_max,channel_max,top_max,time_max
def get_inputs():
ucity_id = tf.placeholder(tf.int32, [None, 1], name="ucity_id")
clevel1_id = tf.placeholder(tf.int32, [None, 1], name="clevel1_id")
ccity_name = tf.placeholder(tf.int32, [None, 1], name="ccity_name")
device_type = tf.placeholder(tf.int32, [None, 1], name="device_type")
manufacturer = tf.placeholder(tf.int32, [None, 1], name="manufacturer")
channel = tf.placeholder(tf.int32, [None, 1], name="channel")
top = tf.placeholder(tf.int32, [None, 1], name="top")
time = tf.placeholder(tf.int32, [None, 1], name="time")
targets = tf.placeholder(tf.float32, [None, 2], name="targets")
LearningRate = tf.placeholder(tf.float32, name="LearningRate")
return ucity_id,clevel1_id,ccity_name,device_type,manufacturer,channel,top,time,targets,LearningRate
def define_embedding_layers(combiner,embed_dim,ucity_id, ucity_id_max, clevel1_id_max,clevel1_id,
ccity_name_max,ccity_name,device_type_max,device_type,manufacturer_max,
manufacturer,channel,channel_max,top,top_max,time,time_max):
ucity_id_embed_matrix = tf.Variable(tf.random_normal([ucity_id_max, embed_dim], 0, 0.001))
ucity_id_embed_layer = tf.nn.embedding_lookup(ucity_id_embed_matrix, ucity_id)
if combiner == "sum":
ucity_id_embed_layer = tf.reduce_sum(ucity_id_embed_layer, axis=1, keep_dims=True)
clevel1_id_embed_matrix = tf.Variable(tf.random_uniform([clevel1_id_max, embed_dim], 0, 0.001))
clevel1_id_embed_layer = tf.nn.embedding_lookup(clevel1_id_embed_matrix, clevel1_id)
if combiner == "sum":
clevel1_id_embed_layer = tf.reduce_sum(clevel1_id_embed_layer, axis=1, keep_dims=True)
ccity_name_embed_matrix = tf.Variable(tf.random_uniform([ccity_name_max, embed_dim], 0, 0.001))
ccity_name_embed_layer = tf.nn.embedding_lookup(ccity_name_embed_matrix,ccity_name)
if combiner == "sum":
ccity_name_embed_layer = tf.reduce_sum(ccity_name_embed_layer, axis=1, keep_dims=True)
device_type_embed_matrix = tf.Variable(tf.random_uniform([device_type_max, embed_dim], 0, 0.001))
device_type_embed_layer = tf.nn.embedding_lookup(device_type_embed_matrix, device_type)
if combiner == "sum":
device_type_embed_layer = tf.reduce_sum(device_type_embed_layer, axis=1, keep_dims=True)
manufacturer_embed_matrix = tf.Variable(tf.random_uniform([manufacturer_max, embed_dim], 0, 0.001))
manufacturer_embed_layer = tf.nn.embedding_lookup(manufacturer_embed_matrix, manufacturer)
if combiner == "sum":
manufacturer_embed_layer = tf.reduce_sum(manufacturer_embed_layer, axis=1, keep_dims=True)
channel_embed_matrix = tf.Variable(tf.random_uniform([channel_max, embed_dim], 0, 0.001))
channel_embed_layer = tf.nn.embedding_lookup(channel_embed_matrix, channel)
if combiner == "sum":
channel_embed_layer = tf.reduce_sum(channel_embed_layer, axis=1, keep_dims=True)
top_embed_matrix = tf.Variable(tf.random_uniform([top_max, embed_dim], 0, 0.001))
top_embed_layer = tf.nn.embedding_lookup(top_embed_matrix, top)
if combiner == "sum":
top_embed_layer = tf.reduce_sum(top_embed_layer, axis=1, keep_dims=True)
time_embed_matrix = tf.Variable(tf.random_uniform([time_max, embed_dim], 0, 0.001))
time_embed_layer = tf.nn.embedding_lookup(time_embed_matrix, time)
if combiner == "sum":
time_embed_layer = tf.reduce_sum(time_embed_layer, axis=1, keep_dims=True)
esmm_embedding_layer = tf.concat([ucity_id_embed_layer, clevel1_id_embed_layer,ccity_name_embed_layer,
device_type_embed_layer,manufacturer_embed_layer,channel_embed_layer,
top_embed_layer,time_embed_layer], axis=1)
esmm_embedding_layer = tf.reshape(esmm_embedding_layer, [-1, embed_dim * 8])
return esmm_embedding_layer
def define_ctr_layer(esmm_embedding_layer):
ctr_layer_1 = tf.layers.dense(esmm_embedding_layer, 200, activation=tf.nn.relu)
ctr_layer_2 = tf.layers.dense(ctr_layer_1, 80, activation=tf.nn.relu)
ctr_layer_3 = tf.layers.dense(ctr_layer_2, 2) # [nonclick, click]
ctr_prob = tf.nn.softmax(ctr_layer_3) + 0.00000001
return ctr_prob
def define_cvr_layer(esmm_embedding_layer):
cvr_layer_1 = tf.layers.dense(esmm_embedding_layer, 200, activation=tf.nn.relu)
cvr_layer_2 = tf.layers.dense(cvr_layer_1, 80, activation=tf.nn.relu)
cvr_layer_3 = tf.layers.dense(cvr_layer_2, 2) # [nonbuy, buy]
cvr_prob = tf.nn.softmax(cvr_layer_3) + 0.00000001
return cvr_prob
def define_ctr_cvr_layer(esmm_embedding_layer):
layer_1 = tf.layers.dense(esmm_embedding_layer, 128 , activation=tf.nn.relu)
layer_2 = tf.layers.dense(layer_1, 16, activation=tf.nn.relu)
layer_3 = tf.layers.dense(layer_2, 2)
ctr_prob = tf.nn.softmax(layer_3) + 0.00000001
cvr_prob = tf.nn.softmax(layer_3) + 0.00000001
return ctr_prob, cvr_prob
if __name__ == '__main__':
hive()
embed_dim = 6
combiner = "sum"
train_values, train_labels, test_values, test_labels, ucity_id_max, clevel1_id_max, ccity_name_max, \
device_type_max, manufacturer_max, channel_max, top_max, time_max = get_data()
tf.reset_default_graph()
train_graph = tf.Graph()
with train_graph.as_default():
ucity_id, clevel1_id, ccity_name, device_type, manufacturer, channel, top, \
time, targets, LearningRate = get_inputs()
esmm_embedding_layer = define_embedding_layers(combiner,embed_dim,ucity_id, ucity_id_max, clevel1_id_max,clevel1_id,
ccity_name_max,ccity_name,device_type_max,device_type,manufacturer_max,
manufacturer,channel,channel_max,top,top_max,time,time_max)
ctr_prob, cvr_prob = define_ctr_cvr_layer(esmm_embedding_layer)
with tf.name_scope("loss"):
ctr_prob_one = tf.slice(ctr_prob, [0, 1], [-1, 1]) # [batch_size , 1]
cvr_prob_one = tf.slice(cvr_prob, [0, 1], [-1, 1]) # [batchsize, 1 ]
ctcvr_prob_one = ctr_prob_one * cvr_prob_one # [ctr*cvr]
ctcvr_prob = tf.concat([1 - ctcvr_prob_one, ctcvr_prob_one], axis=1)
ctr_label = tf.slice(targets, [0, 0], [-1, 1]) # target: [click, buy]
ctr_label = tf.concat([1 - ctr_label, ctr_label], axis=1) # [1-click, click]
cvr_label = tf.slice(targets, [0, 1], [-1, 1])
ctcvr_label = tf.concat([1 - cvr_label, cvr_label], axis=1)
# 单列,判断Click是否=1
ctr_clk = tf.slice(targets, [0, 0], [-1, 1])
ctr_clk_dup = tf.concat([ctr_clk, ctr_clk], axis=1)
# clicked subset CVR loss
cvr_loss = - tf.multiply(tf.log(cvr_prob) * ctcvr_label, ctr_clk_dup)
# batch CTR loss
ctr_loss = - tf.log(ctr_prob) * ctr_label # -y*log(p)-(1-y)*log(1-p)
# batch CTCVR loss
ctcvr_loss = - tf.log(ctcvr_prob) * ctcvr_label
# loss = tf.reduce_mean(ctr_loss + ctcvr_loss + cvr_loss)
# loss = tf.reduce_mean(ctr_loss + ctcvr_loss)
# loss = tf.reduce_mean(ctr_loss + cvr_loss)
loss = tf.reduce_mean(cvr_loss)
ctr_loss = tf.reduce_mean(ctr_loss)
cvr_loss = tf.reduce_mean(cvr_loss)
ctcvr_loss = tf.reduce_mean(ctcvr_loss)
# 优化损失
# train_op = tf.train.AdamOptimizer(lr).minimize(loss) #cost
global_step = tf.Variable(0, name="global_step", trainable=False)
optimizer = tf.train.AdamOptimizer(lr)
gradients = optimizer.compute_gradients(loss) # cost
train_op = optimizer.apply_gradients(gradients, global_step=global_step)
......@@ -156,7 +156,8 @@ def get_data():
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
print("esmm data ok")
print(df.head(2))
# print(df.head(2))
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["y"] = df["y"].astype("str")
......@@ -201,6 +202,8 @@ def transform(a,validate_date):
print(train.shape)
train.to_csv(path + "tr.csv", sep="\t", index=False)
test.to_csv(path + "va.csv", sep="\t", index=False)
liuxiao = df[df["device_id"] == "358035085192742"]
liuxiao.to_csv(path + "liuxiao.csv", index=False)
return model
......@@ -210,7 +213,8 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id"
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.device_id = '358035085192742'"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
......@@ -289,7 +293,7 @@ if __name__ == "__main__":
a = time.time()
temp, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data()
model = transform(temp, validate_date)
# get_predict_set(ucity_id,model,ccity_name,manufacturer,channel)
get_predict_set(ucity_id,model,ccity_name,manufacturer,channel)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment