Commit 460e6336 authored by Your Name's avatar Your Name

dist predict test

parent 855f0003
import tensorflow as tf
import pymysql
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
import datetime
import pandas as pd
from datetime import date, timedelta
import time
from pyspark import StorageLevel
from pyspark.sql import Row
import os
import sys
from sqlalchemy import create_engine
......@@ -30,7 +27,8 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
"tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64),
"number": tf.VarLenFeature(tf.int64),
"search_tag2_list": tf.VarLenFeature(tf.int64),
"search_tag3_list": tf.VarLenFeature(tf.int64),
"uid": tf.VarLenFeature(tf.string),
"city": tf.VarLenFeature(tf.string),
"cid_id": tf.VarLenFeature(tf.string)
......@@ -40,26 +38,24 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
z = parsed.pop('z')
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
files = tf.data.Dataset.list_files(filenames)
dataset = files.apply(
tf.data.experimental.parallel_interleave(
lambda file: tf.data.TFRecordDataset(file),
cycle_length=8
)
)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
dataset = dataset.apply(tf.data.experimental.map_and_batch(map_func=_parse_fn, batch_size=batch_size, num_parallel_calls=8))
dataset = dataset.prefetch(10000)
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
......@@ -88,7 +84,8 @@ def model_fn(features, labels, mode, params):
tag5_list = features['tag5_list']
tag6_list = features['tag6_list']
tag7_list = features['tag7_list']
number = features['number']
search_tag2_list = features['search_tag2_list']
search_tag3_list = features['search_tag3_list']
uid = features['uid']
city = features['city']
cid_id = features['cid_id']
......@@ -107,12 +104,14 @@ def model_fn(features, labels, mode, params):
tag5 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag5_list, sp_weights=None, combiner="sum")
tag6 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag6_list, sp_weights=None, combiner="sum")
tag7 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=tag7_list, sp_weights=None, combiner="sum")
search_tag2 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=search_tag2_list, sp_weights=None, combiner="sum")
search_tag3 = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=search_tag3_list, sp_weights=None, combiner="sum")
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id, shape=[-1, common_dims]), app_id, level2, level3, tag1,
tag2, tag3, tag4, tag5, tag6, tag7], axis=1)
tag2, tag3, tag4, tag5, tag6, tag7,search_tag2,search_tag3], axis=1)
sample_id = tf.sparse.to_dense(number)
uid = tf.sparse.to_dense(uid,default_value="")
city = tf.sparse.to_dense(city,default_value="")
cid_id = tf.sparse.to_dense(cid_id,default_value="")
......@@ -149,7 +148,7 @@ def model_fn(features, labels, mode, params):
pcvr = tf.sigmoid(y_cvr)
pctcvr = pctr*pcvr
predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr, "sample_id": sample_id, "uid":uid, "city":city, "cid_id":cid_id}
predictions = {"pctcvr": pctcvr, "uid": uid, "city": city, "cid_id": cid_id}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT:
......@@ -161,7 +160,6 @@ def model_fn(features, labels, mode, params):
def main(te_file):
dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
model_dir = "hdfs://172.16.32.4:8020/strategy/esmm/model_ckpt/DeepCvrMTL/" + dt_dir
te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/nearby/part-r-00000"]
model_params = {
"field_size": 15,
"feature_size": 600000,
......@@ -176,25 +174,13 @@ def main(te_file):
log_step_count_steps=100, save_summary_steps=100)
Estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=model_dir, params=model_params, config=config)
preds = Estimator.predict(input_fn=lambda: input_fn(te_file, num_epochs=1, batch_size=10000), predict_keys=["pctcvr","pctr","pcvr","sample_id","uid","city","cid_id"])
# with open("/home/gmuser/esmm/nearby/pred.txt", "w") as fo:
# for prob in preds:
# fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
preds = Estimator.predict(input_fn=lambda: input_fn(te_file, num_epochs=1, batch_size=10000), predict_keys=["pctcvr","uid","city","cid_id"])
# ctcvr = []
str_result = ""
for prob in preds:
# ctcvr.append((prob["sample_id"][0],prob['pctcvr']))
str_result = str_result + str(prob["sample_id"][0]) + ":"+ str(prob["uid"][0]) + ":" + str(prob["city"][0]) + ":" + str(prob["cid_id"][0]) + ":" + str(prob['pctcvr']) + ";"
# str_result = list(prob.keys())
# return str_result
str_result = str_result + str(prob["uid"][0]) + ":" + str(prob["city"][0]) + ":" + str(prob["cid_id"][0]) + ":" + str(prob['pctcvr']) + ";"
return str_result[:-1]
# indices = []
# for prob in preds:
# indices.append([prob['pctr'], prob['pcvr'], prob['pctcvr']])
# return indices
def trans(x):
return str(x)[2:-1] if str(x)[0] == 'b' else x
......@@ -230,24 +216,23 @@ if __name__ == "__main__":
te_files = []
for i in range(0,10):
te_files.append([path + "test_native/part-r-0000" + str(i)])
te_files.append([path + "native/part-r-0000" + str(i)])
for i in range(10,100):
te_files.append([path + "test_native/part-r-000" + str(i)])
te_files.append([path + "native/part-r-000" + str(i)])
# te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/test_native/part-r-00000"]
rdd_te_files = spark.sparkContext.parallelize(te_files)
print("-" * 100)
indices = rdd_te_files.repartition(100).map(lambda x: main(x))
indices = rdd_te_files.repartition(40).map(lambda x: main(x))
# print(indices.take(1))
print("dist predict native")
te_result_dataframe = spark.createDataFrame(indices.flatMap(lambda x: x.split(";")).map(
lambda l: Row(sample_id=l.split(":")[0],uid=l.split(":")[1],city=l.split(":")[2],cid_id=l.split(":")[3],ctcvr=l.split(":")[4])))
lambda l: Row(uid=l.split(":")[0],city=l.split(":")[1],cid_id=l.split(":")[2],ctcvr=l.split(":")[3])))
# te_result_dataframe.show()
te_result_dataframe.show()
te_result_dataframe.repartition(50).write.format("parquet").save(path=path+"native_result/",mode="overwrite")
# te_result_dataframe.repartition(50).write.format("parquet").save(path=path+"native_result/",mode="overwrite")
print("耗时(秒):")
print((time.time()-b))
......@@ -276,21 +261,19 @@ if __name__ == "__main__":
# #nearby data
te_files = []
for i in range(0,10):
te_files.append([path + "test_nearby/part-r-0000" + str(i)])
te_files.append([path + "nearby/part-r-0000" + str(i)])
for i in range(10,100):
te_files.append([path + "test_nearby/part-r-000" + str(i)])
te_files.append([path + "nearby/part-r-000" + str(i)])
# te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/test_nearby/part-r-00000"]
rdd_te_files = spark.sparkContext.parallelize(te_files)
print("-"*100)
indices = rdd_te_files.repartition(100).map(lambda x: main(x))
indices = rdd_te_files.repartition(40).map(lambda x: main(x))
# print(indices.take(1))
print("dist predict nearby")
te_result_dataframe = spark.createDataFrame(indices.flatMap(lambda x: x.split(";")).map(
lambda l: Row(sample_id=l.split(":")[0],uid=l.split(":")[1],city=l.split(":")[2],cid_id=l.split(":")[3],ctcvr=l.split(":")[4])))
lambda l: Row(uid=l.split(":")[0],city=l.split(":")[1],cid_id=l.split(":")[2],ctcvr=l.split(":")[3])))
# print("nearby rdd data")
# te_result_dataframe.show()
......@@ -338,43 +321,45 @@ if __name__ == "__main__":
df_all['city_id'] = df_all['city_id'].astype(str)
df_all["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
print("union_device_count", df_all.shape)
host = '172.16.40.158'
port = 4000
user = 'root'
password = '3SYz54LS9#^9sBvC'
db = 'jerry_test'
charset = 'utf8'
#delete table
df_merge = df_all['device_id'] + df_all['city_id']
to_delete = list(df_merge.values)
total = len(to_delete)
df_merge_str = [str(to_delete[:int(total / 5)]).strip('[]')]
for i in range(2, 6):
start = int(total * (i - 1) / 5)
end = int(total * i / 5)
tmp = str(to_delete[start:end]).strip('[]')
df_merge_str.append(tmp)
try:
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
print("delete done")
con.close()
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue', con=engine, if_exists='append', index=False, chunksize=8000)
print("insert done")
except Exception as e:
print(e)
print(df_all.head(10))
# host = '172.16.40.158'
# port = 4000
# user = 'root'
# password = '3SYz54LS9#^9sBvC'
# db = 'jerry_test'
# charset = 'utf8'
#
#
#
# #delete table
# df_merge = df_all['device_id'] + df_all['city_id']
# to_delete = list(df_merge.values)
# total = len(to_delete)
# df_merge_str = [str(to_delete[:int(total / 5)]).strip('[]')]
# for i in range(2, 6):
# start = int(total * (i - 1) / 5)
# end = int(total * i / 5)
# tmp = str(to_delete[start:end]).strip('[]')
# df_merge_str.append(tmp)
#
# try:
# for i in df_merge_str:
# delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
# con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
# db='jerry_test')
# cur = con.cursor()
# cur.execute(delete_str)
# con.commit()
# print("delete done")
# con.close()
# engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
# df_all.to_sql('esmm_device_diary_queue', con=engine, if_exists='append', index=False, chunksize=8000)
# print("insert done")
#
# except Exception as e:
# print(e)
print("耗时(min):")
print((time.time()-b)/60)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment