Commit 93b79322 authored by Your Name's avatar Your Name

test

parent 043e9155
#coding=utf-8 import tensorflow as tf
import pymysql import pymysql
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
import pytispark.pytispark as pti import pytispark.pytispark as pti
...@@ -9,21 +8,196 @@ import pandas as pd ...@@ -9,21 +8,196 @@ import pandas as pd
from datetime import date, timedelta from datetime import date, timedelta
import time import time
from pyspark import StorageLevel from pyspark import StorageLevel
from pyspark.sql import Row
import os import os
import sys
from sqlalchemy import create_engine from sqlalchemy import create_engine
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
def con_sql(sql): print('Parsing', filenames)
""" def _parse_fn(record):
:type sql : str features = {
:rtype : tuple "y": tf.FixedLenFeature([], tf.float32),
""" "z": tf.FixedLenFeature([], tf.float32),
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') "ids": tf.FixedLenFeature([15], tf.int64),
cursor = db.cursor() "app_list": tf.VarLenFeature(tf.int64),
cursor.execute(sql) "level2_list": tf.VarLenFeature(tf.int64),
result = cursor.fetchall() "level3_list": tf.VarLenFeature(tf.int64),
db.close() "tag1_list": tf.VarLenFeature(tf.int64),
return result "tag2_list": tf.VarLenFeature(tf.int64),
"tag3_list": tf.VarLenFeature(tf.int64),
"tag4_list": tf.VarLenFeature(tf.int64),
"tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64),
"number": tf.VarLenFeature(tf.int64),
"uid": tf.VarLenFeature(tf.string),
"city": tf.VarLenFeature(tf.string),
"cid_id": tf.VarLenFeature(tf.string)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
field_size = params["field_size"]
feature_size = params["feature_size"]
embedding_size = params["embedding_size"]
l2_reg = params["l2_reg"]
learning_rate = params["learning_rate"]
#optimizer = params["optimizer"]
layers = list(map(int, params["deep_layers"].split(',')))
dropout = list(map(float, params["dropout"].split(',')))
ctr_task_wgt = params["ctr_task_wgt"]
common_dims = field_size*embedding_size
#------bulid weights------
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
feat_ids = features['ids']
app_list = features['app_list']
level2_list = features['level2_list']
level3_list = features['level3_list']
tag1_list = features['tag1_list']
tag2_list = features['tag2_list']
tag3_list = features['tag3_list']
tag4_list = features['tag4_list']
tag5_list = features['tag5_list']
tag6_list = features['tag6_list']
tag7_list = features['tag7_list']
number = features['number']
uid = features['uid']
city = features['city']
cid_id = features['cid_id']
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
level2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level2_list, sp_weights=None, combiner="sum")
level3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=level3_list, sp_weights=None, combiner="sum")
tag1 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag1_list, sp_weights=None, combiner="sum")
tag2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag2_list, sp_weights=None, combiner="sum")
tag3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag3_list, sp_weights=None, combiner="sum")
tag4 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag4_list, sp_weights=None, combiner="sum")
tag5 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag5_list, sp_weights=None, combiner="sum")
tag6 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag6_list, sp_weights=None, combiner="sum")
tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum")
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1,
tag2, tag3, tag4, tag5, tag6, tag7], axis=1)
sample_id = tf.sparse.to_dense(number)
uid = tf.sparse.to_dense(uid,default_value="")
city = tf.sparse.to_dense(city,default_value="")
cid_id = tf.sparse.to_dense(cid_id,default_value="")
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_cvr = x_concat
for i in range(len(layers)):
x_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_mlp%d' % i)
y_cvr = tf.contrib.layers.fully_connected(inputs=x_cvr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='cvr_out')
y_cvr = tf.reshape(y_cvr,shape=[-1])
with tf.name_scope("CTR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
train_phase = True
else:
train_phase = False
x_ctr = x_concat
for i in range(len(layers)):
x_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=layers[i], \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_mlp%d' % i)
y_ctr = tf.contrib.layers.fully_connected(inputs=x_ctr, num_outputs=1, activation_fn=tf.identity, \
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='ctr_out')
y_ctr = tf.reshape(y_ctr,shape=[-1])
with tf.variable_scope("MTL-Layer"):
pctr = tf.sigmoid(y_ctr)
pcvr = tf.sigmoid(y_cvr)
pctcvr = pctr*pcvr
predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr, "sample_id": sample_id, "uid":uid, "city":city, "cid_id":cid_id}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
export_outputs=export_outputs)
def main(te_file):
dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
model_dir = "hdfs://172.16.32.4:8020/strategy/esmm/model_ckpt/DeepCvrMTL/" + dt_dir
te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/nearby/part-r-00000"]
model_params = {
"field_size": 15,
"feature_size": 600000,
"embedding_size": 16,
"learning_rate": 0.0001,
"l2_reg": 0.005,
"deep_layers": '512,256,128,64,32',
"dropout": '0.3,0.3,0.3,0.3,0.3',
"ctr_task_wgt":0.5
}
config = tf.estimator.RunConfig().replace(session_config = tf.ConfigProto(device_count={'GPU':0, 'CPU':36}),
log_step_count_steps=100, save_summary_steps=100)
Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=model_dir, params=model_params, config=config)
preds = Estimator.predict(input_fn=lambda: input_fn(te_file, num_epochs=1, batch_size=10000), predict_keys=["pctcvr","pctr","pcvr","sample_id","uid","city","cid_id"])
# with open("/home/gmuser/esmm/nearby/pred.txt", "w") as fo:
# for prob in preds:
# fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
# ctcvr = []
str_result = ""
for prob in preds:
# ctcvr.append((prob["sample_id"][0],prob['pctcvr']))
str_result = str_result + str(prob["sample_id"][0]) + ":"+ str(prob["uid"][0]) + ":" + str(prob["city"][0]) + ":" + str(prob["cid_id"][0]) + ":" + str(prob['pctcvr']) + ";"
# str_result = list(prob.keys())
# return str_result
return str_result[:-1]
# indices = []
# for prob in preds:
# indices.append([prob['pctr'], prob['pcvr'], prob['pctcvr']])
# return indices
def trans(x):
return str(x)[2:-1] if str(x)[0] == 'b' else x
def set_join(lst): def set_join(lst):
l = lst.unique().tolist() l = lst.unique().tolist()
...@@ -32,88 +206,24 @@ def set_join(lst): ...@@ -32,88 +206,24 @@ def set_join(lst):
return ','.join(r) return ','.join(r)
def main(): if __name__ == "__main__":
# native queue
df2 = pd.read_csv(path+'/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv(path+"/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False))\
.reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
print("native_device_count",df3.shape)
# nearby queue
df2 = pd.read_csv(path+'/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv(path+"/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False))\
.reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
df_all['device_id'] = df_all['device_id'].astype(str)
df_all['city_id'] = df_all['city_id'].astype(str)
df_all["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
print("union_device_count",df_all.shape)
host='172.16.40.158'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
db='jerry_test'
charset='utf8'
df_merge = df_all['device_id'] + df_all['city_id']
to_delete = list(df_merge.values)
total = len(to_delete)
df_merge_str = [str(to_delete[:int(total/5)]).strip('[]')]
for i in range(2,6):
start = int(total*(i -1)/5)
end = int(total*i/5)
tmp = str(to_delete[start:end]).strip('[]')
df_merge_str.append(tmp)
try:
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
print("delete done")
con.close()
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
print("insert done")
except Exception as e:
print(e)
if sys.argv[1] == "native":
if __name__ == "__main__": b = time.time()
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ path = "hdfs://172.16.32.4:8020/strategy/esmm/"
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ # df = spark.read.format("tfrecords").load(path+"test_native/part-r-00000")
.set("spark.tispark.plan.allow_index_double_read", "false") \ # df.show()
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")\ te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/test_native/part-r-00000"]
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec","snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() print("-" * 100)
spark.sparkContext.setLogLevel("WARN") indices = main(te_files)
print(indices[0])
path = "hdfs://172.16.32.4:8020/strategy/esmm/"
# df = spark.read.format("tfrecords").load(path+"nearby/part-r-00000")
# df.show() print("耗时(秒):")
print((time.time()-b))
uid1 = spark.read.format("csv").options(sep=",",header=True).load(path+"nearby/nearby.csv") else:
uid1.show() print("hello")
pred1 = spark.read.format("csv").options(sep="\t").load(path+"nearby/pred.txt") \ No newline at end of file
pred1.show()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment