Commit 04ae41dc authored by 郭羽's avatar 郭羽

特征工程优化

parent 5b8a4fe3
...@@ -714,7 +714,7 @@ def get_service_feature_df(spark): ...@@ -714,7 +714,7 @@ def get_service_feature_df(spark):
_source = res['_source'] _source = res['_source']
data = parseSource(_source) data = parseSource(_source)
datas.append(data) datas.append(data)
print(len(datas)) print("item size:",len(datas))
dataRDD = spark.sparkContext.parallelize(datas) dataRDD = spark.sparkContext.parallelize(datas)
itemColumns = ['id', 'lowest_price', 'smart_rank2', 'case_count', 'service_type', 'ordered_user_ids_count', itemColumns = ['id', 'lowest_price', 'smart_rank2', 'case_count', 'service_type', 'ordered_user_ids_count',
...@@ -754,7 +754,6 @@ if __name__ == '__main__': ...@@ -754,7 +754,6 @@ if __name__ == '__main__':
spark = get_spark("service_feature_csv_export") spark = get_spark("service_feature_csv_export")
spark.sparkContext.setLogLevel("ERROR") spark.sparkContext.setLogLevel("ERROR")
itemDF = get_service_feature_df(spark)
# 行为数据 # 行为数据
clickSql = getClickSql(startDay,endDay) clickSql = getClickSql(startDay,endDay)
print("--------") print("--------")
...@@ -769,15 +768,13 @@ if __name__ == '__main__': ...@@ -769,15 +768,13 @@ if __name__ == '__main__':
.withColumnRenamed("card_id", "itemid")\ .withColumnRenamed("card_id", "itemid")\
.withColumnRenamed("page_stay", "rating") .withColumnRenamed("page_stay", "rating")
print(itemDF.columns)
print(itemDF.show(10, truncate=False))
# print(userDF.columns)
# print(userDF.show(10))
print(ratingDF.columns) print(ratingDF.columns)
print(ratingDF.show(10, truncate=False)) print(ratingDF.show(10, truncate=False))
itemDF = get_service_feature_df(spark)
print(itemDF.columns)
print(itemDF.show(10, truncate=False))
print("添加label...") print("添加label...")
ratingSamplesWithLabel = addSampleLabel(ratingDF) ratingSamplesWithLabel = addSampleLabel(ratingDF)
posCount = ratingSamplesWithLabel.filter(F.col("label")==1).count() posCount = ratingSamplesWithLabel.filter(F.col("label")==1).count()
......
import tensorflow as tf
import json
import pandas as pd
import time
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import utils.connUtils as connUtils
import utils.configUtils as configUtils
ITEM_NUMBER_COLUMNS = ["item_"+c for c in ["smart_rank2"]]
embedding_columns = ["itemid","userid"] + ["item_"+c for c in ["doctor_id","hospital_id"]]
multi_columns = ["tags_v3","first_demands","second_demands","first_solutions","second_solutions","first_positions","second_positions"]
one_hot_columns = ["item_"+c for c in ["service_type","doctor_type","doctor_famous","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]]
# history_columns = ["userRatedHistory"]
# 数据加载
# data_path_train = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv"
# data_path_test = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv"
VERSION = configUtils.SERVICE_VERSION
trainDay = time.strftime("%Y%m%d%H", time.localtime())
data_path_train = "/data/files/service_feature_{}_train.csv".format(VERSION)
data_path_test = "/data/files/service_feature_{}_test.csv".format(VERSION)
model_file = "/srv/apps/tensorServing_models/test/service/" + trainDay
def is_float(s):
try:
float(s)
return True
except ValueError:
return False
#数据字典
def getDataVocabFromRedis(version):
conn = connUtils.getRedisConn()
key = "Strategy:rec:vocab:service:"+version
dataVocabStr = conn.get(key)
if dataVocabStr:
dataVocab = json.loads(str(dataVocabStr, encoding="utf-8"),encoding='utf-8')
print("-----data_vocab-----")
for k, v in dataVocab.items():
print(k, len(v))
else:
dataVocab = None
return dataVocab
# 数据类型转换
def csvTypeConvert(columns,df,data_vocab):
df["label"] = df["label"].astype("int")
for k in columns:
# 离散na值填充
if data_vocab.get(k):
df[k] = df[k].fillna("-1")
df[k] = df[k].astype("string")
elif k != "label":
# df[k] = df[k].map(lambda x:x if is_float(x) else 0)
df[k] = df[k].fillna(0)
df[k] = df[k].astype("float")
# print(df.dtypes)
return df
def loadData(data_path):
print("读取数据...")
timestmp1 = int(round(time.time() * 1000))
df = pd.read_csv(data_path, sep="|")
timestmp2 = int(round(time.time() * 1000))
print("读取数据耗时ms:{}".format(timestmp2 - timestmp1))
return df
def getDataSet(df,shuffleSize = 10000,batchSize=128):
# print(df.dtypes)
labels = df.pop('label')
dataSet = tf.data.Dataset.from_tensor_slices((dict(df), labels)).shuffle(shuffleSize).batch(batchSize)
return dataSet
def getTrainColumns(train_columns,data_vocab):
emb_columns = []
number_columns = []
oneHot_columns = []
dataColumns = []
inputs = {}
# 离散特征
for feature in train_columns:
if data_vocab.get(feature):
if feature.count("__")>0:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature])
col = tf.feature_column.embedding_column(cat_col, 5)
emb_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
elif feature in one_hot_columns or feature.count("Bucket") > 0:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
# col = tf.feature_column.indicator_column(cat_col)
col = tf.feature_column.embedding_column(cat_col, 3)
oneHot_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
else:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature])
col = tf.feature_column.embedding_column(cat_col, 10)
emb_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
elif feature in ITEM_NUMBER_COLUMNS:
col = tf.feature_column.numeric_column(feature)
number_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='float32')
return emb_columns,number_columns,oneHot_columns,dataColumns,inputs
def train(emb_columns, number_columns, oneHot_columns, inputs, train_dataset):
wide = tf.keras.layers.DenseFeatures(emb_columns + number_columns + oneHot_columns)(inputs)
deep = tf.keras.layers.Dense(64, activation='relu')(wide)
deep = tf.keras.layers.Dropout(0.2)(deep)
# concat_layer = tf.keras.layers.concatenate([wide, deep], axis=1)
deep = tf.keras.layers.Dense(64, activation='relu')(deep)
deep = tf.keras.layers.Dropout(0.5)(deep)
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(deep)
model = tf.keras.Model(inputs, output_layer)
# compile the model, set loss function, optimizer and evaluation metrics
model.compile(
loss='mse',
optimizer='adam',
metrics=['accuracy', tf.keras.metrics.AUC(curve='ROC'), tf.keras.metrics.AUC(curve='PR')])
# train the model
print("train start...")
model.fit(train_dataset, epochs=5)
print("train end...")
print("train save...")
model.save(model_file, include_optimizer=False, save_format='tf')
return model
def evaluate(model,test_dataset):
if not model:
print("加载模型中")
model = tf.keras.models.load_model(model_file)
# evaluate the model
timestmp1 = int(round(time.time()))
print("evaluate:")
test_loss, test_accuracy, test_roc_auc, test_pr_auc = model.evaluate(test_dataset)
print('\n\nTest Loss {}, Test Accuracy {}, Test ROC AUC {}, Test PR AUC {}'.format(test_loss, test_accuracy,
test_roc_auc, test_pr_auc))
print("验证耗时s:", int(round(time.time())) - timestmp1)
def predict(model_path,df):
print("加载模型中...")
model_new = tf.keras.models.load_model(model_path)
# model_new.summary()
print("模型加载完成...")
# model = tf.keras.models.model_from_json(model.to_json)
n = 1000
dd = dict(df.sample(n=n))
for i in range(10):
timestmp1 = int(round(time.time() * 1000))
model_new.predict(dd, batch_size=10000)
print("测试样本数:{},测试耗时ms:{}".format(n, int(round(time.time() * 1000)) - timestmp1))
if __name__ == '__main__':
curTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("train_service执行时间:{}".format(curTime))
# redis中加载数据字典
print("redis 中加载模型字典...")
data_vocab = getDataVocabFromRedis(VERSION)
assert data_vocab
print("读取数据...")
timestmp1 = int(round(time.time()))
df_train = loadData(data_path_train)
# df_test = loadData(data_path_test)
timestmp2 = int(round(time.time()))
print("读取数据耗时s:{}".format(timestmp2 - timestmp1))
# 获取训练列
columns = df_train.columns.tolist()
print("原始数据列:")
print(columns)
emb_columns,number_columns,oneHot_columns, datasColumns,inputs = getTrainColumns(columns, data_vocab)
print("训练列:")
print(datasColumns)
df_train = df_train[datasColumns + ["label"]]
# df_test = df_test[datasColumns + ["label"]]
trainSize = df_train["label"].count()
print("trainSize:{}".format(trainSize))
# testSize = df_test["label"].count()
# print("trainSize:{},testSize{}".format(trainSize,testSize))
# 数据类型转换
df_train = csvTypeConvert(datasColumns,df_train,data_vocab)
# df_test = csvTypeConvert(datasColumns,df_test,data_vocab)
# 获取训练数据
train_data = getDataSet(df_train,shuffleSize=trainSize,)
# test_data = getDataSet(df_test,shuffleSize=testSize)
print("train start...")
timestmp3 = int(round(time.time()))
model = train(emb_columns,number_columns,oneHot_columns,inputs,train_data)
timestmp4 = int(round(time.time()))
print("train end...耗时h:{}".format((timestmp4 - timestmp3)/60/60))
# evaluate(model,test_data)
# predict(model_file,test_data)
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment