Commit 335c5551 authored by Your Name's avatar Your Name

test dist predict

parent cef1c8a8
...@@ -11,6 +11,53 @@ from pyspark import StorageLevel ...@@ -11,6 +11,53 @@ from pyspark import StorageLevel
from pyspark.sql import Row from pyspark.sql import Row
import os import os
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
def _parse_fn(record):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([15], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64),
"level3_list": tf.VarLenFeature(tf.int64),
"tag1_list": tf.VarLenFeature(tf.int64),
"tag2_list": tf.VarLenFeature(tf.int64),
"tag3_list": tf.VarLenFeature(tf.int64),
"tag4_list": tf.VarLenFeature(tf.int64),
"tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64),
"number": tf.VarLenFeature(tf.int64),
"uid": tf.VarLenFeature(tf.string),
"city": tf.VarLenFeature(tf.string),
"cid_id": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def model_fn(features, labels, mode, params): def model_fn(features, labels, mode, params):
"""Bulid Model function f(x) for Estimator.""" """Bulid Model function f(x) for Estimator."""
#------hyperparameters---- #------hyperparameters----
...@@ -40,6 +87,9 @@ def model_fn(features, labels, mode, params): ...@@ -40,6 +87,9 @@ def model_fn(features, labels, mode, params):
tag6_list = features['tag6_list'] tag6_list = features['tag6_list']
tag7_list = features['tag7_list'] tag7_list = features['tag7_list']
number = features['number'] number = features['number']
uid = features['uid']
city = features['city']
cid_id = features['cid_id']
#------build f(x)------ #------build f(x)------
...@@ -61,6 +111,9 @@ def model_fn(features, labels, mode, params): ...@@ -61,6 +111,9 @@ def model_fn(features, labels, mode, params):
tag2, tag3, tag4, tag5, tag6, tag7], axis=1) tag2, tag3, tag4, tag5, tag6, tag7], axis=1)
sample_id = tf.sparse.to_dense(number) sample_id = tf.sparse.to_dense(number)
uid = tf.sparse.to_dense(uid)
city = tf.sparse.to_dense(city)
cid_id = tf.sparse.to_dense(cid_id)
with tf.name_scope("CVR_Task"): with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN: if mode == tf.estimator.ModeKeys.TRAIN:
...@@ -94,7 +147,7 @@ def model_fn(features, labels, mode, params): ...@@ -94,7 +147,7 @@ def model_fn(features, labels, mode, params):
pcvr = tf.sigmoid(y_cvr) pcvr = tf.sigmoid(y_cvr)
pctcvr = pctr*pcvr pctcvr = pctr*pcvr
predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr, "sample_id": sample_id} predictions={"pcvr": pcvr, "pctr": pctr, "pctcvr": pctcvr, "sample_id": sample_id, "uid":uid, "city":city, "cid_id":cid_id}
export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)} export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)}
# Provide an estimator spec for `ModeKeys.PREDICT` # Provide an estimator spec for `ModeKeys.PREDICT`
if mode == tf.estimator.ModeKeys.PREDICT: if mode == tf.estimator.ModeKeys.PREDICT:
...@@ -103,49 +156,7 @@ def model_fn(features, labels, mode, params): ...@@ -103,49 +156,7 @@ def model_fn(features, labels, mode, params):
predictions=predictions, predictions=predictions,
export_outputs=export_outputs) export_outputs=export_outputs)
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
def _parse_fn(record):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([15], tf.int64),
"app_list": tf.VarLenFeature(tf.int64),
"level2_list": tf.VarLenFeature(tf.int64),
"level3_list": tf.VarLenFeature(tf.int64),
"tag1_list": tf.VarLenFeature(tf.int64),
"tag2_list": tf.VarLenFeature(tf.int64),
"tag3_list": tf.VarLenFeature(tf.int64),
"tag4_list": tf.VarLenFeature(tf.int64),
"tag5_list": tf.VarLenFeature(tf.int64),
"tag6_list": tf.VarLenFeature(tf.int64),
"tag7_list": tf.VarLenFeature(tf.int64),
"number": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
y = parsed.pop('y')
z = parsed.pop('z')
return parsed, {"y": y, "z": z}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if perform_shuffle:
dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size) # Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return batch_features, batch_labels
def main(te_file): def main(te_file):
dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d') dt_dir = (date.today() + timedelta(-1)).strftime('%Y%m%d')
...@@ -175,7 +186,7 @@ def main(te_file): ...@@ -175,7 +186,7 @@ def main(te_file):
str_result = "" str_result = ""
for prob in preds: for prob in preds:
# ctcvr.append((prob["sample_id"][0],prob['pctcvr'])) # ctcvr.append((prob["sample_id"][0],prob['pctcvr']))
str_result = str_result + str(prob["sample_id"][0]) + ":" + str(prob['pctcvr']) + ";" str_result = str_result + str(prob["sample_id"][0]) + ":"+ prob["uid"][0] + ":" + prob["city"] + ":" + prob["cid_id"] + ":" + str(prob['pctcvr']) + ";"
# return ctcvr # return ctcvr
return str_result[:-1] return str_result[:-1]
...@@ -208,16 +219,19 @@ if __name__ == "__main__": ...@@ -208,16 +219,19 @@ if __name__ == "__main__":
# te_files.append([path + "test_native/part-r-0000" + str(i)]) # te_files.append([path + "test_native/part-r-0000" + str(i)])
# for i in range(10,100): # for i in range(10,100):
# te_files.append([path + "test_native/part-r-000" + str(i)]) # te_files.append([path + "test_native/part-r-000" + str(i)])
#
# rdd_te_files = spark.sparkContext.parallelize(te_files) te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/test_native/part-r-00000"]
# print("-"*100)
# print(rdd_te_files.collect()) rdd_te_files = spark.sparkContext.parallelize(te_files)
# print("-" * 100) print("-"*100)
# indices = rdd_te_files.repartition(100).map(lambda x: main(x)) print(rdd_te_files.collect())
# print(indices.take(1)) print("-" * 100)
# indices = rdd_te_files.repartition(100).map(lambda x: main(x))
# te_result_dataframe = spark.createDataFrame(indices.flatMap(lambda x: x.split(";")).map(lambda l: Row(sample_id=l.split(":")[0],ctcvr=l.split(":")[1]))) print(indices.take(1))
# te_result_dataframe.show()
te_result_dataframe = spark.createDataFrame(indices.flatMap(lambda x: x.split(";")).map(
lambda l: Row(sample_id=l.split(":")[0],uid=l.split(":")[1],city=l.split(":")[2],cid_id=l.split(":")[3],ctcvr=l.split(":")[4])))
te_result_dataframe.show()
# te_result_dataframe.toPandas().to_csv("/home/gmuser/esmm/native/pred.txt", header=True) # te_result_dataframe.toPandas().to_csv("/home/gmuser/esmm/native/pred.txt", header=True)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment