Commit d7563b4d authored by 郭羽's avatar 郭羽

美购精排模型耗时优化

parent 7dfd0d48
......@@ -306,14 +306,15 @@ def featuresToRedis(samples,columns,prefix,redisKey):
timestampCol = idCol+"_timestamp"
def toRedis(datas):
# conn = connUtils.getRedisConn()
# utils路径无法找到情况
conn = connUtils.getRedisConn()
for d in datas:
k = d[idCol]
v = json.dumps(d.asDict(), ensure_ascii=False)
newKey = redisKey + k
print(newKey,v)
# conn.set(newKey, v)
# conn.expire(newKey, 60 * 60 * 24 * 7)
conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7)
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
......@@ -682,47 +683,48 @@ if __name__ == '__main__':
print("collect feature for item:{}".format(str(item_columns)))
timestmp4 = int(round(time.time()))
# user特征数据存入redis
print("user feature to redis...")
featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
# userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
# featureToRedis(FEATURE_USER_KEY, userDatas)
# featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
print("user feature to collect 耗时ms:{}".format(int(round(time.time())))-timestmp4)
timestmp4 = int(round(time.time()))
featureToRedis(FEATURE_USER_KEY, userDatas)
timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp4))
# item特征数据存入redis
print("item feature to redis...")
featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
# itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
# featureToRedis(FEATURE_ITEM_KEY, itemDatas)
# featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
print("item feature to collect 耗时ms:{}".format(int(round(time.time()))) - timestmp5)
timestmp5 = int(round(time.time()))
featureToRedis(FEATURE_ITEM_KEY, itemDatas)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
# # model columns
# print("model columns to redis...")
# model_columns = user_columns + item_columns
# featureColumnsToRedis(model_columns)
#
# train_columns = model_columns + ["label", "timestamp"]
# trainSamples = samplesWithUserFeatures.select(*train_columns)
# print("write to hdfs start...")
# splitTimestamp = int(time.mktime(time.strptime(endDay, "%Y%m%d")))
# splitAndSaveTrainingTestSamplesByTimeStamp(trainSamples, splitTimestamp, TRAIN_FILE_PATH)
# print("write to hdfs success...")
# timestmp7 = int(round(time.time()))
# print("数据写入hdfs 耗时s:{}".format(timestmp7 - timestmp6))
#
# # 离散数据字典生成
# print("数据字典生成...")
# dataVocab = getDataVocab(samplesWithUserFeatures)
# timestmp8 = int(round(time.time()))
# print("数据字典生成 耗时s:{}".format(timestmp8 - timestmp7))
# # 字典转为json 存入redis
# print("数据字典存入redis...")
# dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
# dataVocabToRedis(dataVocabStr)
# timestmp9 = int(round(time.time()))
# print("总耗时s:{}".format(timestmp9 - timestmp8))
# model columns
print("model columns to redis...")
model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns)
train_columns = model_columns + ["label", "timestamp"]
trainSamples = samplesWithUserFeatures.select(*train_columns)
print("write to hdfs start...")
splitTimestamp = int(time.mktime(time.strptime(endDay, "%Y%m%d")))
splitAndSaveTrainingTestSamplesByTimeStamp(trainSamples, splitTimestamp, TRAIN_FILE_PATH)
print("write to hdfs success...")
timestmp7 = int(round(time.time()))
print("数据写入hdfs 耗时s:{}".format(timestmp7 - timestmp6))
# 离散数据字典生成
print("数据字典生成...")
dataVocab = getDataVocab(samplesWithUserFeatures)
timestmp8 = int(round(time.time()))
print("数据字典生成 耗时s:{}".format(timestmp8 - timestmp7))
# 字典转为json 存入redis
print("数据字典存入redis...")
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
dataVocabToRedis(dataVocabStr)
timestmp9 = int(round(time.time()))
print("总耗时s:{}".format(timestmp9 - timestmp8))
spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment