Commit e5cccd39 authored by 郭羽's avatar 郭羽

美购精排模型耗时优化

parent 4ad672a6
......@@ -6,4 +6,4 @@ pythonFile=${path}/spark/featureEng.py
#log_file=~/${content_type}_feature_csv_export.log
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_v1_train
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_v1_test
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 2g --executor-cores 1 --num-executors 8 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 8 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
#cd /srv/apps/tensorServing_models && rm -rf /srv/apps/tensorServing_models/service/ && mkdir service
cd /srv/apps/tensorServing_models && rm -rf service_copy && mv service service_copy && mkdir service
source /srv/envs/serviceRec/bin/activate
python /srv/apps/serviceRec/train/train_service.py > /srv/apps/serviceRec/logs/train_service_log.log
\ No newline at end of file
......@@ -231,6 +231,7 @@ def getDataVocab(samples):
dataVocab = {}
multiVocab = {}
# 多值特征
for c in ITEM_MULTI_COLUMN_EXTRA_MAP.keys():
datas = samples.select(c).distinct().collect()
tagSet = set()
......@@ -243,10 +244,21 @@ def getDataVocab(samples):
multiVocab[c] = list(tagSet)
samples = samples.drop(c)
# id类特征
for c in ["userid","itemid"]:
datas = samples.select(c).distinct().collect()
vocabSet = set()
for d in datas:
if d[c]:
vocabSet.add(str(d[c]))
vocabSet.add("-1") # 空值的默认
dataVocab[c] = list(vocabSet)
pass
for c in samples.columns:
print("col",c)
# 判断是否以Bucket结尾 和 类别特征
if c.endswith("Bucket") or c.endswith("userRatedHistory") or c in ITEM_CATE_COLUMNS+["userid","itemid"]:
if c.endswith("Bucket"):
datas = samples.select(c).distinct().collect()
vocabSet = set()
for d in datas:
......@@ -254,6 +266,8 @@ def getDataVocab(samples):
vocabSet.add(str(d[c]))
vocabSet.add("-1")# 空值的默认
dataVocab[c] = list(vocabSet)
elif c.count("userRatedHistory") > 0:
dataVocab[c] = dataVocab["itemid"]
else:
# 判断是否多值离散列
for cc, v in multiVocab.items():
......@@ -290,6 +304,25 @@ def collectFeaturesToDict(samples,columns,prefix):
print(prefix,len(resDatas))
return {d[idCol]:json.dumps(d.asDict(),ensure_ascii=False) for d in resDatas}
def featuresToRedis(samples,columns,prefix,redisKey):
idCol = prefix+"id"
timestampCol = idCol+"_timestamp"
def toRedis(datas):
conn = connUtils.getRedisConn()
for d in datas:
k = d[idCol]
v = json.dumps(d.asDict(), ensure_ascii=False)
newKey = redisKey + k
conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7)
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
resDF = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
distinctDF = resDF.select(*columns).distinct()
print(prefix,distinctDF.count())
distinctDF.foreachPartition(toRedis)
"""
数据加载
......@@ -654,15 +687,17 @@ if __name__ == '__main__':
# user特征数据存入redis
print("user feature to redis...")
userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
featureToRedis(FEATURE_USER_KEY, userDatas)
featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
# userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
# featureToRedis(FEATURE_USER_KEY, userDatas)
timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp4))
# item特征数据存入redis
print("item feature to redis...")
itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
featureToRedis(FEATURE_ITEM_KEY, itemDatas)
featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
# itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
# featureToRedis(FEATURE_ITEM_KEY, itemDatas)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
......@@ -675,7 +710,7 @@ if __name__ == '__main__':
trainSamples = samplesWithUserFeatures.select(*train_columns)
print("write to hdfs start...")
splitTimestamp = int(time.mktime(time.strptime(endDay, "%Y%m%d")))
splitAndSaveTrainingTestSamplesByTimeStamp(samplesWithUserFeatures, splitTimestamp, TRAIN_FILE_PATH)
splitAndSaveTrainingTestSamplesByTimeStamp(trainSamples, splitTimestamp, TRAIN_FILE_PATH)
print("write to hdfs success...")
timestmp7 = int(round(time.time()))
print("数据写入hdfs 耗时s:{}".format(timestmp7 - timestmp6))
......
......@@ -105,20 +105,6 @@ def getTrainColumns(train_columns,data_vocab):
emb_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
# if feature.startswith("userRatedHistory") or feature.count("__") > 0 or feature in embedding_columns:
# cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
# # col = tf.feature_column.embedding_column(cat_col, 10)
# col = tf.feature_column.indicator_column(cat_col)
# columns.append(col)
# dataColumns.append(feature)
# inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
# elif feature in one_hot_columns or feature.count("Bucket") > 0:
# cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
# col = tf.feature_column.indicator_column(cat_col)
# columns.append(col)
# dataColumns.append(feature)
# inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
elif feature in ITEM_NUMBER_COLUMNS:
col = tf.feature_column.numeric_column(feature)
......@@ -126,7 +112,6 @@ def getTrainColumns(train_columns,data_vocab):
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='float32')
return emb_columns,number_columns,oneHot_columns,dataColumns,inputs
......@@ -142,12 +127,6 @@ def train(emb_columns, number_columns, oneHot_columns, inputs, train_dataset):
# output_layer = FM(1)(deep)
model = tf.keras.Model(inputs, output_layer)
# model = tf.keras.Sequential([
# tf.keras.layers.DenseFeatures(columns)(inputs),
# tf.keras.layers.Dense(128, activation='relu')(inputs),
# tf.keras.layers.Dense(128, activation='relu')(inputs),
# tf.keras.layers.Dense(1, activation='sigmoid'),
# ])
# compile the model, set loss function, optimizer and evaluation metrics
model.compile(
......@@ -212,8 +191,11 @@ if __name__ == '__main__':
# 获取训练列
columns = df_train.columns.tolist()
print("原始数据列:")
print(columns)
emb_columns,number_columns,oneHot_columns, datasColumns,inputs = getTrainColumns(columns, data_vocab)
print("训练列:")
print(datasColumns)
df_train = df_train[datasColumns + ["label"]]
df_test = df_test[datasColumns + ["label"]]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment