Commit a7735bd8 authored by 郭羽's avatar 郭羽

service model 优化

parent ebe58143
...@@ -303,17 +303,6 @@ def featureToRedis(key,datas): ...@@ -303,17 +303,6 @@ def featureToRedis(key,datas):
conn.set(newKey,v) conn.set(newKey,v)
conn.expire(newKey, 60 * 60 * 24 * 7) conn.expire(newKey, 60 * 60 * 24 * 7)
def collectFeaturesToDict(samples,columns,prefix):
idCol = prefix+"id"
timestampCol = idCol+"_timestamp"
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
resDatas = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
resDatas = resDatas.select(*columns).distinct().collect()
print(prefix,len(resDatas))
return {d[idCol]:json.dumps(d.asDict(),ensure_ascii=False) for d in resDatas}
def featuresToRedis(samples,columns,prefix,redisKey): def featuresToRedis(samples,columns,prefix,redisKey):
idCol = prefix+"id" idCol = prefix+"id"
timestampCol = idCol+"_timestamp" timestampCol = idCol+"_timestamp"
...@@ -818,43 +807,31 @@ if __name__ == '__main__': ...@@ -818,43 +807,31 @@ if __name__ == '__main__':
print("数据字典save...") print("数据字典save...")
print("dataVocab:", str(dataVocab.keys())) print("dataVocab:", str(dataVocab.keys()))
# vocab_path = "../vocab/{}_vocab.json".format(VERSION) vocab_path = "../vocab/{}_vocab.json".format(VERSION)
# dataVocabStr = json.dumps(dataVocab, ensure_ascii=False) dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
# open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr) open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
#
# """特征数据存入redis======================================""" """特征数据存入redis======================================"""
# # user特征数据存入redis # user特征数据存入redis
# featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY) featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
# timestmp5 = int(round(time.time())) timestmp5 = int(round(time.time()))
# print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp3)) print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp3))
# # userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
# # featureToRedis(FEATURE_USER_KEY, userDatas) # item特征数据存入redis
# # itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item") featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
# # featureToRedis(FEATURE_ITEM_KEY, itemDatas) timestmp6 = int(round(time.time()))
# print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
# # item特征数据存入redis
# # todo 添加最近一个月有行为的item,待优化:扩大item范围 """训练数据保存 ======================================"""
# featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY) timestmp3 = int(round(time.time()))
# timestmp6 = int(round(time.time())) train_columns = model_columns + ["label", "timestamp", "rating"]
# print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5)) trainSamples = samplesWithUserFeatures.select(*train_columns)
# train_df = trainSamples.toPandas()
# """训练数据保存 ======================================"""
# timestmp3 = int(round(time.time()))
# train_columns = model_columns + ["label", "timestamp", "rating"]
# trainSamples = samplesWithUserFeatures.select(*train_columns)
# print("write to hdfs start...")
# splitTimestamp = int(time.mktime(time.strptime(addDays(0), "%Y%m%d")))
# splitAndSaveTrainingTestSamplesByTimeStamp(trainSamples, splitTimestamp, TRAIN_FILE_PATH)
# print("write to hdfs success...")
# timestmp4 = int(round(time.time()))
# print("数据写入hdfs 耗时s:{}".format(timestmp4 - timestmp3))
#
# print("总耗时m:{}".format((timestmp4 - start)/60))
#
train_df = samplesWithUserFeatures.toPandas()
train_df = pd.DataFrame(train_df) train_df = pd.DataFrame(train_df)
train_df.to_csv("/tmp/service_{}.csv".format(endDay)) train_df.to_csv("/tmp/service_{}.csv".format(endDay))
print("训练数据写入success") timestmp4 = int(round(time.time()))
print("训练数据写入success 耗时s:{}".format(timestmp4 - timestmp3))
print("总耗时m:{}".format((timestmp4 - start)/60))
spark.stop() spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment