Commit 338ffc4e authored by 郭羽's avatar 郭羽

美购精排模型

parent 0499a77c
import tensorflow as tf
import json
import pandas as pd
import time
import utils.connUtils as connUtils
ITEM_NUMBER_COLUMNS = ["smart_rank2"]
embedding_columns = ["itemid","userid","doctor_id","hospital_id"]
multi_columns = ["tags_v3","first_demands","second_demands","first_solutions","second_solutions","first_positions","second_positions"]
one_hot_columns = ["service_type","doctor_type","doctor_famous","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]
# history_columns = ["userRatedHistory"]
# 数据加载
data_path_train = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv"
data_path_test = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv"
# data_path_train = "/data/files/service_feature_train.csv"
# data_path_test = "/data/files/service_feature_test.csv"
version = "v1"
model_file = "service_mlp_"+version
#数据字典
def getDataVocabFromRedis(version):
conn = connUtils.getRedisConn()
key = "Strategy:rec:vocab:service:"+version
dataVocabStr = conn.get(key)
if dataVocabStr:
dataVocab = json.loads(dataVocabStr,encoding='utf-8')
else:
dataVocab = None
print("-----data_vocab-----")
for k, v in data_vocab.items():
print(k, len(v))
return dataVocab
# 数据类型转换
def csvTypeConvert(df,data_vocab):
# 离散na值填充
for k, v in data_vocab.items():
df[k] = df[k].fillna("-1")
df[k] = df[k].astype("string")
for k in ITEM_NUMBER_COLUMNS:
df[k] = df[k].fillna(0.0)
df[k] = df[k].astype("float")
df["label"] = df["label"].astype("int")
return df
def loadData(data_path):
print("读取数据...")
timestmp1 = int(round(time.time() * 1000))
df = pd.read_csv(data_path, sep="|")
timestmp2 = int(round(time.time() * 1000))
print("读取数据耗时ms:{}".format(timestmp2 - timestmp1))
return df
def getDataSet(df,shuffleSize = 10000,batchSize=128):
# print(df.dtypes)
labels = df.pop('label')
dataSet = tf.data.Dataset.from_tensor_slices((dict(df), labels)).shuffle(shuffleSize).batch(batchSize)
return dataSet
def getTrainColumns(train_columns,data_vocab):
columns = []
# 离散特征
for feature in train_columns:
if data_vocab.get(feature):
if feature.startswith("userRatedHistory") or feature.count("__") > 0 or feature in embedding_columns:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
col = tf.feature_column.embedding_column(cat_col, 10)
columns.append(col)
elif feature in one_hot_columns or feature.count("Bucket") > 0:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
col = tf.feature_column.indicator_column(cat_col)
columns.append(col)
elif feature in ITEM_NUMBER_COLUMNS or feature.endswith("RatingAvg") or feature.endswith("RatingStddev"):
col = tf.feature_column.numeric_column(feature)
columns.append(col)
return columns
def train(columns,train_dataset):
model = tf.keras.Sequential([
tf.keras.layers.DenseFeatures(columns),
tf.keras.layers.DenseFeatures(columns),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid'),
])
# compile the model, set loss function, optimizer and evaluation metrics
model.compile(
loss='mse',
optimizer='adam',
metrics=['accuracy', tf.keras.metrics.AUC(curve='ROC'), tf.keras.metrics.AUC(curve='PR')])
# train the model
print("train start...")
model.fit(train_dataset, epochs=5)
print("train end...")
print("train save...")
model.save(model_file, include_optimizer=False, save_format='tf')
def evaluate(model,test_dataset):
# evaluate the model
timestmp1 = int(round(time.time()))
print("evaluate:")
test_loss, test_accuracy, test_roc_auc, test_pr_auc = model.evaluate(test_dataset)
print('\n\nTest Loss {}, Test Accuracy {}, Test ROC AUC {}, Test PR AUC {}'.format(test_loss, test_accuracy,
test_roc_auc, test_pr_auc))
print("验证耗时s:", int(round(time.time())) - timestmp1)
if __name__ == '__main__':
# redis中加载数据字典
print("redis 中加载模型字典...")
data_vocab = getDataVocabFromRedis(version)
assert not data_vocab
print("读取数据...")
timestmp1 = int(round(time.time() * 1000))
df_train = loadData(data_path_train)
df_test = loadData(data_path_test)
timestmp2 = int(round(time.time() * 1000))
print("读取数据耗时ms:{}".format(timestmp2 - timestmp1))
df_train = df_train[list(data_vocab.keys()) + ITEM_NUMBER_COLUMNS + ["label"]]
df_test = df_test[list(data_vocab.keys()) + ITEM_NUMBER_COLUMNS + ["label"]]
trainSize = df_train["label"].count()
testSize = df_test["label"].count()
print("trainSize:{},testSize{}".format(trainSize,testSize))
# 数据类型转换
df_train = csvTypeConvert(df_train)
df_test = csvTypeConvert(df_test)
columns = df_train.columns.tolist()
# 获取训练数据
train_data = getDataSet(df_train,shuffleSize=trainSize,)
test_data = getDataSet(df_test,shuffleSize=testSize)
# 获取训练列
columns = getTrainColumns(columns,data_vocab)
model = train(columns,train_data)
# evaluate(model,test_data)
pass
dataPath=/data/files
content_type="service"
cd $dataPath
function mergeGetFile(){
if [ ! -n "$1" ];then
echo "dir Doesn't exist,don't run this shell"
exit 1
fi
rm -f $dataPath/$1.csv
logging "rm -f $dataPath/$1.csv"
/opt/hadoop/bin/hdfs dfs -getmerge /$1 $dataPath/$1.csv
logging "/opt/hadoop/bin/hdfs dfs -getmerge /$1 $dataPath/$1.csv success"
head -1 $1.csv > $1.csv.head
cat $1.csv|grep -v `cat $1.csv.head` >> $1.csv.head
mv $1.csv.head $1.csv
/opt/hadoop/bin/hdfs dfs -rmr /$1
logging "/opt/hadoop/bin/hdfs dfs -rmr /$1 success"
}
mergeGetFile ${content_type}_feature_train
mergeGetFile ${content_type}_feature_test
\ No newline at end of file
path=/srv/apps/serviceRec
day_count=$1
content_type="service"
pythonFile=${path}/shell/service_feature_csv_export.py
#log_file=~/${content_type}_feature_csv_export.log
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_train
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_test
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
This diff is collapsed.
import redis
def getRedisConn():
# pool = redis.ConnectionPool(host="172.16.50.145",password="XfkMCCdWDIU%ls$h",port=6379,db=0)
# conn = redis.Redis(connection_pool=pool)
# conn = redis.Redis(host="172.16.50.145", port=6379, password="XfkMCCdWDIU%ls$h",db=0)
conn = redis.Redis(host="172.18.51.10", port=6379,db=0) #test
return conn
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment