import tensorflow as tf import json import pandas as pd import time import sys import os sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))) import utils.connUtils as connUtils import utils.configUtils as configUtils ITEM_NUMBER_COLUMNS = ["item_"+c for c in ["smart_rank2"]] embedding_columns = ["itemid","userid"] + ["item_"+c for c in ["doctor_id","hospital_id"]] multi_columns = ["tags_v3","first_demands","second_demands","first_solutions","second_solutions","first_positions","second_positions"] one_hot_columns = ["item_"+c for c in ["service_type","doctor_type","doctor_famous","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]] # history_columns = ["userRatedHistory"] # 数据加载 # data_path_train = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv" # data_path_test = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv" VERSION = configUtils.SERVICE_VERSION trainDay = time.strftime("%Y%m%d", time.localtime()) data_path_test = "/data/files/service_feature_{}_test.csv".format(VERSION) model_file = configUtils.SERVICE_MODEL_PATH + "/" + trainDay def is_float(s): try: float(s) return True except ValueError: return False #数据字典 def getDataVocabFromRedis(version): conn = connUtils.getRedisConn() key = "Strategy:rec:vocab:service:"+version dataVocabStr = conn.get(key) if dataVocabStr: dataVocab = json.loads(str(dataVocabStr, encoding="utf-8"),encoding='utf-8') print("-----data_vocab-----") for k, v in dataVocab.items(): print(k, len(v)) else: dataVocab = None return dataVocab # 数据类型转换 def csvTypeConvert(columns,df,data_vocab): df["label"] = df["label"].astype("int") for k in columns: # 离散na值填充 if data_vocab.get(k): df[k] = df[k].fillna("-1") df[k] = df[k].astype("string") elif k != "label": # df[k] = df[k].map(lambda x:x if is_float(x) else 0) df[k] = df[k].fillna(0) df[k] = df[k].astype("float") # print(df.dtypes) return df def loadData(data_path): print("读取数据...") timestmp1 = int(round(time.time() * 1000)) df = pd.read_csv(data_path, sep="|") timestmp2 = int(round(time.time() * 1000)) print("读取数据耗时ms:{}".format(timestmp2 - timestmp1)) return df def getTrainColumns(train_columns,data_vocab): emb_columns = [] number_columns = [] oneHot_columns = [] dataColumns = [] inputs = {} # 离散特征 for feature in train_columns: if data_vocab.get(feature): if feature.count("__")>0: cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature]) col = tf.feature_column.embedding_column(cat_col, 5) emb_columns.append(col) dataColumns.append(feature) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') elif feature in one_hot_columns or feature.count("Bucket") > 0: cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature]) # col = tf.feature_column.indicator_column(cat_col) col = tf.feature_column.embedding_column(cat_col, 3) oneHot_columns.append(col) dataColumns.append(feature) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') else: cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature]) col = tf.feature_column.embedding_column(cat_col, 10) emb_columns.append(col) dataColumns.append(feature) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') elif feature in ITEM_NUMBER_COLUMNS: col = tf.feature_column.numeric_column(feature) number_columns.append(col) dataColumns.append(feature) inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='float32') return emb_columns,number_columns,oneHot_columns,dataColumns,inputs def test(df_train,n=100): import requests # ddd = {} # datasColumnss = df_train.columns.to_list() # dd = df_train.sample(n=n) # for c in datasColumnss: # vvv = dd[c].tolist() # ddd[c] = vvv # # print(ddd) # pre_data = {"inputs":ddd} # pre_data = json.dumps(pre_data) # pre_data = pre_data.replace("'",'"') # print(pre_data) # print("测试样本数:{},测试耗时ms:{}".format(n,int(round(time.time()*1000))- timestmp1)) # print(r) for i in range(100): ddd = {} datasColumnss = df_train.columns.to_list() dd = df_train.sample(n=n) for c in datasColumnss: vvv = dd[c].tolist() ddd[c] = vvv # print(ddd) pre_data = {"inputs": ddd} pre_data = json.dumps(pre_data) # pre_data = pre_data.replace("'",'"') # print(pre_data) timestmp1 = int(round(time.time() * 1000)) r = requests.post('http://tensorserving.paas-develop.env/v1/models/service:predict', data=pre_data) print("测试样本数:{},测试耗时ms:{}".format(n, int(round(time.time() * 1000)) - timestmp1)) print(r) if __name__ == '__main__': n = int(sys.argv[1]) # redis中加载数据字典 print("redis 中加载模型字典...") data_vocab = getDataVocabFromRedis(VERSION) assert data_vocab print("读取数据...") timestmp1 = int(round(time.time())) df_test = loadData(data_path_test) timestmp2 = int(round(time.time())) print("读取数据耗时s:{}".format(timestmp2 - timestmp1)) # 获取训练列 columns = df_test.columns.tolist() print(columns) emb_columns,number_columns,oneHot_columns, datasColumns,inputs = getTrainColumns(columns, data_vocab) df_test = df_test[datasColumns + ["label"]] # 数据类型转换 df_test = csvTypeConvert(datasColumns,df_test,data_vocab) test(df_test,n)