Commit 62698495 authored by 郭羽's avatar 郭羽

update feature

parent dc65d8fe
...@@ -781,7 +781,6 @@ if __name__ == '__main__': ...@@ -781,7 +781,6 @@ if __name__ == '__main__':
negCount = ratingSamplesWithLabel.filter(F.col("label")==0).count() negCount = ratingSamplesWithLabel.filter(F.col("label")==0).count()
print("pos size:"+str(posCount),"neg size:"+str(negCount)) print("pos size:"+str(posCount),"neg size:"+str(negCount))
# 数据字典 # 数据字典
dataVocab = {} dataVocab = {}
multiVocab = {} multiVocab = {}
...@@ -805,14 +804,40 @@ if __name__ == '__main__': ...@@ -805,14 +804,40 @@ if __name__ == '__main__':
# item columns # item columns
item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")] item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
print("collect feature for item:{}".format(str(item_columns))) print("collect feature for item:{}".format(str(item_columns)))
# model columns
print("model columns to redis...")
model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns)
print("数据字典save...")
print("dataVocab:", str(dataVocab.keys()))
vocab_path = "../vocab/{}_vocab.json".format(VERSION)
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
# dataVocabToRedis(dataVocabStr)
"""训练数据保存 ======================================"""
timestmp3 = int(round(time.time()))
train_columns = model_columns + ["label", "timestamp"]
trainSamples = samplesWithUserFeatures.select(*train_columns)
print("write to hdfs start...")
splitTimestamp = int(time.mktime(time.strptime(addDays(0), "%Y%m%d")))
splitAndSaveTrainingTestSamplesByTimeStamp(trainSamples, splitTimestamp, TRAIN_FILE_PATH)
print("write to hdfs success...")
timestmp4 = int(round(time.time())) timestmp4 = int(round(time.time()))
print("数据写入hdfs 耗时s:{}".format(timestmp4 - timestmp3))
"""特征数据存入redis======================================"""
# user特征数据存入redis # user特征数据存入redis
featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY) featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
timestmp5 = int(round(time.time())) timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5-timestmp4)) print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp4))
# userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user") # userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
# featureToRedis(FEATURE_USER_KEY, userDatas) # featureToRedis(FEATURE_USER_KEY, userDatas)
# itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
# featureToRedis(FEATURE_ITEM_KEY, itemDatas)
# item特征数据存入redis # item特征数据存入redis
# todo 添加最近一个月有行为的item,待优化:扩大item范围 # todo 添加最近一个月有行为的item,待优化:扩大item范围
...@@ -820,38 +845,6 @@ if __name__ == '__main__': ...@@ -820,38 +845,6 @@ if __name__ == '__main__':
timestmp6 = int(round(time.time())) timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5)) print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
# itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item") print("总耗时m:{}".format((timestmp6 - start)/60))
# featureToRedis(FEATURE_ITEM_KEY, itemDatas)
# model columns
print("model columns to redis...")
model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns)
train_columns = model_columns + ["label", "timestamp"]
trainSamples = samplesWithUserFeatures.select(*train_columns)
print("write to hdfs start...")
splitTimestamp = int(time.mktime(time.strptime(addDays(0), "%Y%m%d")))
splitAndSaveTrainingTestSamplesByTimeStamp(trainSamples, splitTimestamp, TRAIN_FILE_PATH)
print("write to hdfs success...")
timestmp7 = int(round(time.time()))
print("数据写入hdfs 耗时s:{}".format(timestmp7 - timestmp6))
# 离散数据字典生成
# print("数据字典生成...")
# dataVocab = getDataVocab(samplesWithUserFeatures,model_columns,dataVocab)
# timestmp8 = int(round(time.time()))
# print("数据字典生成 耗时s:{}".format(timestmp8 - timestmp7))
# 字典转为json 存入redis
print("数据字典save...")
print("dataVocab:")
print(dataVocab.keys())
vocab_path = "../vocab/{}_vocab.json".format(VERSION)
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
open(configUtils.VOCAB_PATH,mode='w',encoding='utf-8').write(dataVocabStr)
# dataVocabToRedis(dataVocabStr)
timestmp9 = int(round(time.time()))
print("总耗时m:{}".format((timestmp9 - start)/60))
spark.stop() spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment