import requests import tensorflow as tf import json import pandas as pd import time import sys import os from datetime import date, timedelta sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))) import utils.configUtils as configUtils import utils.connUtils as connUtils VERSION = configUtils.SERVICE_VERSION trainDay = time.strftime("%Y%m%d%H", time.localtime()) data_path_train = "/data/files/service_feature_{}_train.csv".format(VERSION) data_path_test = "/data/files/service_feature_{}_test.csv".format(VERSION) model_file = configUtils.SERVICE_MODEL_PATH + "/" + trainDay def is_float(s): try: float(s) return True except ValueError: return False # 数据类型转换 def csvTypeConvert(columns,df,data_vocab): df["label"] = df["label"].astype("int") for k in columns: # 离散na值填充 if data_vocab.get(k): df[k] = df[k].fillna("-1") df[k] = df[k].astype("string") elif k != "label": # df[k] = df[k].map(lambda x:x if is_float(x) else 0) df[k] = df[k].fillna(0) df[k] = df[k].astype("float") # print(df.dtypes) return df def loadData(data_path): print("读取数据...") timestmp1 = int(round(time.time() * 1000)) df = pd.read_csv(data_path, sep="|") timestmp2 = int(round(time.time() * 1000)) print("读取数据耗时ms:{}".format(timestmp2 - timestmp1)) return df def getWeight(x): res = 1 try: p = int(x) if p > 0 and p <= 5: res = 2 elif p > 5 and p <= 10: res = 3 elif p > 10: res = 4 except Exception as e: print(e) return res def getDataSet(df,shuffleSize = 10000,batchSize=128): # print(df.dtypes) labels = df.pop('label') # df["rating"] = df["rating"].map(getWeight) # weights = df.pop('rating') dataSet = tf.data.Dataset.from_tensor_slices((dict(df), labels)).shuffle(shuffleSize).batch(batchSize) return dataSet def getTrainColumns(train_columns,data_vocab): emb_columns = [] number_columns = [] inputs = {} # 离散特征 for feature in train_columns: if data_vocab.get(feature): cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature]) col = tf.feature_column.embedding_column(cat_col, 10) emb_columns.append(col) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') elif feature.endswith("_number"): col = tf.feature_column.numeric_column(feature) number_columns.append(col) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='float32') return emb_columns,number_columns,inputs def train(emb_columns, number_columns, inputs, train_dataset): wide = tf.keras.layers.DenseFeatures(emb_columns + number_columns)(inputs) deep = tf.keras.layers.Dense(64, activation='relu')(wide) deep = tf.keras.layers.Dropout(0.2)(deep) concat_layer = tf.keras.layers.concatenate([wide, deep], axis=1) # deep = tf.keras.layers.Dense(64, activation='relu')(deep) # deep = tf.keras.layers.Dropout(0.5)(deep) output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(concat_layer) # output_layer = FM(1)(deep) model = tf.keras.Model(inputs, output_layer) # compile the model, set loss function, optimizer and evaluation metrics model.compile( loss='binary_crossentropy', optimizer='adam', metrics=['accuracy', tf.keras.metrics.AUC(curve='ROC'), tf.keras.metrics.AUC(curve='PR')]) # train the model print("train start...") model.fit(train_dataset, epochs=5) print("train end...") print("train save...") model.save(model_file, include_optimizer=False, save_format='tf') return model def evaluate(model,test_dataset): if not model: print("加载模型中") model = tf.keras.models.load_model(model_file) # evaluate the model timestmp1 = int(round(time.time())) print("evaluate:") test_loss, test_accuracy, test_roc_auc, test_pr_auc = model.evaluate(test_dataset) print('\n\nTest Loss {}, Test Accuracy {}, Test ROC AUC {}, Test PR AUC {}'.format(test_loss, test_accuracy, test_roc_auc, test_pr_auc)) print("验证耗时s:", int(round(time.time())) - timestmp1) def predict(model_path,df): print("加载模型中...") model_new = tf.keras.models.load_model(model_path) # model_new.summary() print("模型加载完成...") # model = tf.keras.models.model_from_json(model.to_json) n = 1000 dd = dict(df.sample(n=n)) for i in range(10): timestmp1 = int(round(time.time() * 1000)) model_new.predict(dd, batch_size=10000) print("测试样本数:{},测试耗时ms:{}".format(n, int(round(time.time() * 1000)) - timestmp1)) def addDays(n, format="%Y%m%d"): return (date.today() + timedelta(days=n)).strftime(format) def test(df_train): VERSION = configUtils.SERVICE_VERSION FEATURE_USER_KEY = "Strategy:rec:feature:service:" + VERSION + ":user:" FEATURE_ITEM_KEY = "Strategy:rec:feature:service:" + VERSION + ":item:" FEATURE_VOCAB_KEY = "Strategy:rec:vocab:service:" + VERSION FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION conn = connUtils.getRedisConn4() s = conn.get(FEATURE_COLUMN_KEY) user = conn.get(FEATURE_USER_KEY+"869982038583034") item = conn.get(FEATURE_ITEM_KEY+"5884526") datasColumnss = json.loads(s,encoding='utf-8') user_d = json.loads(user,encoding='utf-8') item_d = json.loads(item,encoding='utf-8') ddd = {} for d in datasColumnss: res = ["-1"] if d in user_d: res = [user_d[d]] if d in item_d: res = [item_d[d]] ddd[d] = res # # conn.get() # ddd = {} # datasColumnss = df_train.columns.to_list() # dd = df_train.sample(n=10) # for c in datasColumnss: # vvv = dd[c].tolist() # ddd[c] = vvv pre_data = {"inputs":ddd} pre_data = json.dumps(pre_data) # pre_data = pre_data.replace("'",'"') print(pre_data) timestmp1 = int(round(time.time()*1000)) r = requests.post('http://tensorserving:80/v1/models/service:predict', data=pre_data) print("测试样本数:{},测试耗时ms:{}".format(10,int(round(time.time()*1000))- timestmp1)) if __name__ == '__main__': # curTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # print("train_service执行时间:{}".format(curTime)) # splitTimestamp = int(time.mktime(time.strptime(addDays(-1), "%Y%m%d"))) # # # redis中加载数据字典 # print("加载模型字典...") # data_vocab = json.load(open(configUtils.VOCAB_PATH,mode='r',encoding='utf-8')) # print("字典keys:",str(data_vocab.keys())) # # data_vocab = getDataVocabFromRedis(VERSION) # assert data_vocab # # timestmp1 = int(round(time.time())) # df_train = loadData(data_path_train) # print(df_train.dtypes) # print("训练数据列:",df_train.columns) # # df_test = df_train.loc[df_train['timestamp']>=splitTimestamp] # df_train = df_train.loc[df_train['timestamp'] < splitTimestamp] # # df_test = loadData(data_path_test) # timestmp2 = int(round(time.time())) # print("读取数据耗时s:{}".format(timestmp2 - timestmp1)) # # # 获取训练列 # columns = df_train.columns.tolist() # print("原始数据列:") # print(columns) # emb_columns,number_columns,inputs = getTrainColumns(columns, data_vocab) # print("训练列:") # datasColumns = list(inputs.keys()) # print(datasColumns) # # df_train = df_train[datasColumns + ["label"]] # df_test = df_test[datasColumns + ["label"]] # # trainSize = df_train["label"].count() # print("trainSize:{}".format(trainSize)) # testSize = df_test["label"].count() # print("trainSize:{},testSize{}".format(trainSize,testSize)) # # # 数据类型转换 # df_train = csvTypeConvert(datasColumns,df_train,data_vocab) # df_test = csvTypeConvert(datasColumns,df_test,data_vocab) test(None)