Commit 634d9af6 authored by 郭羽's avatar 郭羽

service model 优化

parent 5e1859bf
import requests
import utils.configUtils as configUtils
import utils.connUtils as connUtils
import json
import tensorflow as tf
import json
import pandas as pd
import time
import sys
import os
from datetime import date, timedelta
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import utils.configUtils as configUtils
VERSION = configUtils.SERVICE_VERSION
trainDay = time.strftime("%Y%m%d%H", time.localtime())
data_path_train = "/data/files/service_feature_{}_train.csv".format(VERSION)
data_path_test = "/data/files/service_feature_{}_test.csv".format(VERSION)
model_file = configUtils.SERVICE_MODEL_PATH + "/" + trainDay
def is_float(s):
try:
float(s)
return True
except ValueError:
return False
# 数据类型转换
def csvTypeConvert(columns,df,data_vocab):
df["label"] = df["label"].astype("int")
for k in columns:
# 离散na值填充
if data_vocab.get(k):
df[k] = df[k].fillna("-1")
df[k] = df[k].astype("string")
elif k != "label":
# df[k] = df[k].map(lambda x:x if is_float(x) else 0)
df[k] = df[k].fillna(0)
df[k] = df[k].astype("float")
# print(df.dtypes)
return df
def loadData(data_path):
print("读取数据...")
timestmp1 = int(round(time.time() * 1000))
df = pd.read_csv(data_path, sep="|")
timestmp2 = int(round(time.time() * 1000))
print("读取数据耗时ms:{}".format(timestmp2 - timestmp1))
return df
def getWeight(x):
res = 1
try:
p = int(x)
if p > 0 and p <= 5:
res = 2
elif p > 5 and p <= 10:
res = 3
elif p > 10:
res = 4
except Exception as e:
print(e)
return res
def getDataSet(df,shuffleSize = 10000,batchSize=128):
# print(df.dtypes)
labels = df.pop('label')
# df["rating"] = df["rating"].map(getWeight)
# weights = df.pop('rating')
dataSet = tf.data.Dataset.from_tensor_slices((dict(df), labels)).shuffle(shuffleSize).batch(batchSize)
return dataSet
def getTrainColumns(train_columns,data_vocab):
emb_columns = []
number_columns = []
inputs = {}
# 离散特征
for feature in train_columns:
if data_vocab.get(feature):
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature])
col = tf.feature_column.embedding_column(cat_col, 10)
emb_columns.append(col)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
elif feature.endswith("_number"):
col = tf.feature_column.numeric_column(feature)
number_columns.append(col)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='float32')
return emb_columns,number_columns,inputs
def train(emb_columns, number_columns, inputs, train_dataset):
wide = tf.keras.layers.DenseFeatures(emb_columns + number_columns)(inputs)
deep = tf.keras.layers.Dense(64, activation='relu')(wide)
deep = tf.keras.layers.Dropout(0.2)(deep)
concat_layer = tf.keras.layers.concatenate([wide, deep], axis=1)
# deep = tf.keras.layers.Dense(64, activation='relu')(deep)
# deep = tf.keras.layers.Dropout(0.5)(deep)
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(concat_layer)
# output_layer = FM(1)(deep)
model = tf.keras.Model(inputs, output_layer)
# compile the model, set loss function, optimizer and evaluation metrics
model.compile(
loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy', tf.keras.metrics.AUC(curve='ROC'), tf.keras.metrics.AUC(curve='PR')])
# train the model
print("train start...")
model.fit(train_dataset, epochs=5)
print("train end...")
print("train save...")
model.save(model_file, include_optimizer=False, save_format='tf')
return model
def evaluate(model,test_dataset):
if not model:
print("加载模型中")
model = tf.keras.models.load_model(model_file)
# evaluate the model
timestmp1 = int(round(time.time()))
print("evaluate:")
test_loss, test_accuracy, test_roc_auc, test_pr_auc = model.evaluate(test_dataset)
print('\n\nTest Loss {}, Test Accuracy {}, Test ROC AUC {}, Test PR AUC {}'.format(test_loss, test_accuracy,
test_roc_auc, test_pr_auc))
print("验证耗时s:", int(round(time.time())) - timestmp1)
def predict(model_path,df):
print("加载模型中...")
model_new = tf.keras.models.load_model(model_path)
# model_new.summary()
print("模型加载完成...")
# model = tf.keras.models.model_from_json(model.to_json)
n = 1000
dd = dict(df.sample(n=n))
for i in range(10):
timestmp1 = int(round(time.time() * 1000))
model_new.predict(dd, batch_size=10000)
print("测试样本数:{},测试耗时ms:{}".format(n, int(round(time.time() * 1000)) - timestmp1))
def addDays(n, format="%Y%m%d"):
return (date.today() + timedelta(days=n)).strftime(format)
def test(df_train):
VERSION = configUtils.SERVICE_VERSION
FEATURE_USER_KEY = "Strategy:rec:feature:service:" + VERSION + ":user:"
FEATURE_ITEM_KEY = "Strategy:rec:feature:service:" + VERSION + ":item:"
FEATURE_VOCAB_KEY = "Strategy:rec:vocab:service:" + VERSION
FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION
conn = connUtils.getRedisConn4()
s = conn.get(FEATURE_COLUMN_KEY)
# s = conn.get(FEATURE_USER_KEY+"869982038583034")
datasColumnss = json.loads(s,encoding='utf-8')
#
# conn.get()
ddd = {}
# datasColumnss = df_train.columns.to_list()
dd = df_train.sample(n=10)
for c in datasColumnss:
vvv = dd[c].tolist()
ddd[c] = vvv
pre_data = {"inputs":ddd}
pre_data = json.dumps(pre_data)
# pre_data = pre_data.replace("'",'"')
# print(pre_data)
timestmp1 = int(round(time.time()*1000))
r = requests.post('http://tensorserving:80/v1/models/service:predict', data=pre_data)
print("测试样本数:{},测试耗时ms:{}".format(10,int(round(time.time()*1000))- timestmp1))
if __name__ == '__main__':
curTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("train_service执行时间:{}".format(curTime))
splitTimestamp = int(time.mktime(time.strptime(addDays(-1), "%Y%m%d")))
# redis中加载数据字典
print("加载模型字典...")
data_vocab = json.load(open(configUtils.VOCAB_PATH,mode='r',encoding='utf-8'))
print("字典keys:",str(data_vocab.keys()))
# data_vocab = getDataVocabFromRedis(VERSION)
assert data_vocab
timestmp1 = int(round(time.time()))
df_train = loadData(data_path_train)
print(df_train.dtypes)
print("训练数据列:",df_train.columns)
df_test = df_train.loc[df_train['timestamp']>=splitTimestamp]
df_train = df_train.loc[df_train['timestamp'] < splitTimestamp]
# df_test = loadData(data_path_test)
timestmp2 = int(round(time.time()))
print("读取数据耗时s:{}".format(timestmp2 - timestmp1))
# 获取训练列
columns = df_train.columns.tolist()
print("原始数据列:")
print(columns)
emb_columns,number_columns,inputs = getTrainColumns(columns, data_vocab)
print("训练列:")
datasColumns = list(inputs.keys())
print(datasColumns)
df_train = df_train[datasColumns + ["label"]]
df_test = df_test[datasColumns + ["label"]]
trainSize = df_train["label"].count()
print("trainSize:{}".format(trainSize))
testSize = df_test["label"].count()
print("trainSize:{},testSize{}".format(trainSize,testSize))
# 数据类型转换
df_train = csvTypeConvert(datasColumns,df_train,data_vocab)
df_test = csvTypeConvert(datasColumns,df_test,data_vocab)
test(df_train)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment