import tensorflow as tf import json import pandas as pd import time import sys import os from datetime import date, timedelta sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))) import utils.configUtils as configUtils ITEM_NUMBER_COLUMNS = ["item_"+c for c in ["smart_rank2"]] embedding_columns = ["itemid","userid"] + ["item_"+c for c in ["doctor_id","hospital_id","merchant_id"]] multi_columns = ["tags_v3","first_demands","second_demands","first_solutions","second_solutions","first_positions","second_positions"] one_hot_columns = ["user_os"] + ["item_"+c for c in ["service_type","doctor_type","doctor_famous","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]] # history_columns = ["userRatedHistory"] # 数据加载 # data_path_train = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv" # data_path_test = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv" VERSION = configUtils.SERVICE_VERSION trainDay = time.strftime("%Y%m%d%H", time.localtime()) data_path_train = "/data/files/service_feature_{}_train.csv".format(VERSION) data_path_test = "/data/files/service_feature_{}_test.csv".format(VERSION) model_file = configUtils.SERVICE_MODEL_PATH + "/" + trainDay def is_float(s): try: float(s) return True except ValueError: return False # 数据类型转换 def csvTypeConvert(columns,df,data_vocab): df["label"] = df["label"].astype("int") for k in columns: # 离散na值填充 if data_vocab.get(k): df[k] = df[k].fillna("-1") df[k] = df[k].astype("string") elif k != "label": # df[k] = df[k].map(lambda x:x if is_float(x) else 0) df[k] = df[k].fillna(0) df[k] = df[k].astype("float") # print(df.dtypes) return df def loadData(data_path): print("读取数据...") timestmp1 = int(round(time.time() * 1000)) df = pd.read_csv(data_path, sep="|") timestmp2 = int(round(time.time() * 1000)) print("读取数据耗时ms:{}".format(timestmp2 - timestmp1)) return df def getWeight(x): res = 1 try: p = int(x) if p > 0 and p <= 5: res = 2 elif p > 5 and p <= 10: res = 3 elif p > 10: res = 4 except Exception as e: print(e) return res def getDataSet(df,shuffleSize = 10000,batchSize=128): # print(df.dtypes) labels = df.pop('label') # df["rating"] = df["rating"].map(getWeight) # weights = df.pop('rating') dataSet = tf.data.Dataset.from_tensor_slices((dict(df), labels)).shuffle(shuffleSize).batch(batchSize) return dataSet def getTrainColumns(train_columns,data_vocab): emb_columns = [] number_columns = [] oneHot_columns = [] dataColumns = [] inputs = {} # 离散特征 for feature in train_columns: if data_vocab.get(feature): if feature.count("__")>0: cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature]) col = tf.feature_column.embedding_column(cat_col, 5) emb_columns.append(col) dataColumns.append(feature) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') elif feature in one_hot_columns or feature.count("Bucket") > 0: cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature]) # col = tf.feature_column.indicator_column(cat_col) col = tf.feature_column.embedding_column(cat_col, 3) oneHot_columns.append(col) dataColumns.append(feature) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') else: cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature]) col = tf.feature_column.embedding_column(cat_col, 10) emb_columns.append(col) dataColumns.append(feature) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') elif feature in ITEM_NUMBER_COLUMNS: col = tf.feature_column.numeric_column(feature) number_columns.append(col) dataColumns.append(feature) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='float32') return emb_columns,number_columns,oneHot_columns,dataColumns,inputs def train(emb_columns, number_columns, oneHot_columns, inputs, train_dataset): wide = tf.keras.layers.DenseFeatures(emb_columns + number_columns + oneHot_columns)(inputs) deep = tf.keras.layers.Dense(64, activation='relu')(wide) deep = tf.keras.layers.Dropout(0.2)(deep) concat_layer = tf.keras.layers.concatenate([wide, deep], axis=1) # deep = tf.keras.layers.Dense(64, activation='relu')(deep) # deep = tf.keras.layers.Dropout(0.5)(deep) output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(concat_layer) # output_layer = FM(1)(deep) model = tf.keras.Model(inputs, output_layer) # compile the model, set loss function, optimizer and evaluation metrics model.compile( loss='binary_crossentropy', optimizer='adam', metrics=['accuracy', tf.keras.metrics.AUC(curve='ROC'), tf.keras.metrics.AUC(curve='PR')]) # train the model print("train start...") model.fit(train_dataset, epochs=5) print("train end...") print("train save...") model.save(model_file, include_optimizer=False, save_format='tf') return model def evaluate(model,test_dataset): if not model: print("加载模型中") model = tf.keras.models.load_model(model_file) # evaluate the model timestmp1 = int(round(time.time())) print("evaluate:") test_loss, test_accuracy, test_roc_auc, test_pr_auc = model.evaluate(test_dataset) print('\n\nTest Loss {}, Test Accuracy {}, Test ROC AUC {}, Test PR AUC {}'.format(test_loss, test_accuracy, test_roc_auc, test_pr_auc)) print("验证耗时s:", int(round(time.time())) - timestmp1) def predict(model_path,df): print("加载模型中...") model_new = tf.keras.models.load_model(model_path) # model_new.summary() print("模型加载完成...") # model = tf.keras.models.model_from_json(model.to_json) n = 1000 dd = dict(df.sample(n=n)) for i in range(10): timestmp1 = int(round(time.time() * 1000)) model_new.predict(dd, batch_size=10000) print("测试样本数:{},测试耗时ms:{}".format(n, int(round(time.time() * 1000)) - timestmp1)) def addDays(n, format="%Y%m%d"): return (date.today() + timedelta(days=n)).strftime(format) if __name__ == '__main__': curTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print("train_service执行时间:{}".format(curTime)) splitTimestamp = int(time.mktime(time.strptime(addDays(-1), "%Y%m%d"))) # redis中加载数据字典 print("加载模型字典...") data_vocab = json.load(open(configUtils.VOCAB_PATH,mode='r',encoding='utf-8')) print("字典keys:",str(data_vocab.keys())) # data_vocab = getDataVocabFromRedis(VERSION) assert data_vocab timestmp1 = int(round(time.time())) df_train = loadData(data_path_train) print(df_train.dtypes) df_test = df_train.loc[df_train['timestamp']>=splitTimestamp] df_train = df_train.loc[df_train['timestamp'] < splitTimestamp] # df_test = loadData(data_path_test) timestmp2 = int(round(time.time())) print("读取数据耗时s:{}".format(timestmp2 - timestmp1)) # 获取训练列 columns = df_train.columns.tolist() print("原始数据列:") print(columns) emb_columns,number_columns,oneHot_columns, datasColumns,inputs = getTrainColumns(columns, data_vocab) print("训练列:") print(datasColumns) df_train = df_train[datasColumns + ["label"]] df_test = df_test[datasColumns + ["label"]] trainSize = df_train["label"].count() print("trainSize:{}".format(trainSize)) testSize = df_test["label"].count() print("trainSize:{},testSize{}".format(trainSize,testSize)) # 数据类型转换 df_train = csvTypeConvert(datasColumns,df_train,data_vocab) df_test = csvTypeConvert(datasColumns,df_test,data_vocab) # 获取训练数据 train_data = getDataSet(df_train,shuffleSize=trainSize,) print("train start...") timestmp3 = int(round(time.time())) model = train(emb_columns,number_columns,oneHot_columns,inputs,train_data) timestmp4 = int(round(time.time())) print("train end...耗时h:{}".format((timestmp4 - timestmp3)/60/60)) if(testSize > 0): test_data = getDataSet(df_test,shuffleSize=testSize) evaluate(model,test_data) # predict(model_file,test_data) pass