Commit b3d1aaaa authored by 郭羽's avatar 郭羽

美购精排模型

parent 599a8f58
...@@ -68,27 +68,28 @@ TRAIN_FILE_PATH = "service_feature_" + VERSION ...@@ -68,27 +68,28 @@ TRAIN_FILE_PATH = "service_feature_" + VERSION
def addItemFeatures(samples,itemDF): def addItemFeatures(samples,itemDF):
prefix = "item_" prefix = "item_"
itemDF = itemDF.withColumnRenamed("id", "itemid") itemDF = itemDF.withColumnRenamed("id", "itemid")
samples = samples.join(itemDF, on=['itemid'], how='left')
# 数据过滤:无医生
samples = samples.filter(col("doctor_id")!="-1")
# null处理 # null处理
for c in ITEM_NUMBER_COLUMNS: for c in ITEM_NUMBER_COLUMNS:
print("null count:",c,samples.filter(col(c).isNull()).count()) print("null count:",c,itemDF.filter(col(c).isNull()).count())
samples = samples.withColumn(prefix+c,when(col(c).isNull(),0).otherwise(col(c)).cast("float")).drop(c) itemDF = itemDF.withColumn(prefix+c,when(col(c).isNull(),0).otherwise(col(c)).cast("float")).drop(c)
for c in ITEM_CATE_COLUMNS: for c in ITEM_CATE_COLUMNS:
print("null count:", c, samples.filter(col(c).isNull()).count()) print("null count:", c, itemDF.filter(col(c).isNull()).count())
samples = samples.withColumn(prefix+c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))).drop(c) itemDF = itemDF.withColumn(prefix+c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))).drop(c)
# 离散特征处理 # 离散特征处理
for c, v in ITEM_MULTI_COLUMN_EXTRA_MAP.items(): for c, v in ITEM_MULTI_COLUMN_EXTRA_MAP.items():
print("null count:", c, samples.filter(col(c).isNull()).count()) print("null count:", c, itemDF.filter(col(c).isNull()).count())
samples = samples.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))) itemDF = itemDF.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c)))
for i in range(1, v + 1): for i in range(1, v + 1):
new_c = prefix + c + "__" + str(i) new_c = prefix + c + "__" + str(i)
samples = samples.withColumn(new_c, F.split(F.col(c), ",")[i - 1]) itemDF = itemDF.withColumn(new_c, F.split(F.col(c), ",")[i - 1])
samples = samples.withColumn(new_c, F.when(F.col(new_c).isNull(), "-1").otherwise(F.col(new_c))) itemDF = itemDF.withColumn(new_c, F.when(F.col(new_c).isNull(), "-1").otherwise(F.col(new_c)))
samples = samples.join(itemDF, on=['itemid'], how='inner')
# 数据过滤:无医生
samples = samples.filter(col("doctor_id") != "-1")
# 统计特征处理 # 统计特征处理
staticFeatures = samples.groupBy('itemid').agg(F.count(F.lit(1)).alias('itemRatingCount'), staticFeatures = samples.groupBy('itemid').agg(F.count(F.lit(1)).alias('itemRatingCount'),
F.avg(F.col('rating')).alias('itemRatingAvg'), F.avg(F.col('rating')).alias('itemRatingAvg'),
...@@ -258,6 +259,8 @@ def getDataVocab(samples): ...@@ -258,6 +259,8 @@ def getDataVocab(samples):
if c.count(cc) > 0: if c.count(cc) > 0:
dataVocab[c] = v dataVocab[c] = v
print(c,len(dataVocab[c]))
return dataVocab return dataVocab
def dataVocabToRedis(dataVocab): def dataVocabToRedis(dataVocab):
...@@ -634,22 +637,12 @@ if __name__ == '__main__': ...@@ -634,22 +637,12 @@ if __name__ == '__main__':
timestmp1 = int(round(time.time())) timestmp1 = int(round(time.time()))
samplesWithItemFeatures = addItemFeatures(ratingSamplesWithLabel, itemDF) samplesWithItemFeatures = addItemFeatures(ratingSamplesWithLabel, itemDF)
timestmp2 = int(round(time.time())) timestmp2 = int(round(time.time()))
print("处理item特征 耗时s:{}".format(timestmp2 - timestmp1)) print("处理item特征,size:{} 耗时s:{}".format(samplesWithItemFeatures.count(),timestmp2 - timestmp1))
print("处理user特征...") print("处理user特征...")
samplesWithUserFeatures = addUserFeatures(samplesWithItemFeatures) samplesWithUserFeatures = addUserFeatures(samplesWithItemFeatures)
timestmp3 = int(round(time.time())) timestmp3 = int(round(time.time()))
print("处理user特征 耗时s:{}".format(timestmp3 - timestmp2)) print("处理user特征,size:{} 耗时s:{}".format(samplesWithUserFeatures.count(),timestmp3 - timestmp2))
# 离散数据字典生成
print("数据字典生成...")
dataVocab = getDataVocab(samplesWithUserFeatures)
timestmp4 = int(round(time.time()))
print("数据字典生成 耗时s:{}".format(timestmp4 - timestmp3))
# 字典转为json 存入redis
print("数据字典存入redis...")
dataVocabStr = json.dumps(dataVocab,ensure_ascii=False)
dataVocabToRedis(dataVocabStr)
# user columns # user columns
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")] user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
...@@ -658,6 +651,8 @@ if __name__ == '__main__': ...@@ -658,6 +651,8 @@ if __name__ == '__main__':
item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")] item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
print("collect feature for item:{}".format(str(item_columns))) print("collect feature for item:{}".format(str(item_columns)))
timestmp4 = int(round(time.time()))
# user特征数据存入redis # user特征数据存入redis
print("user feature to redis...") print("user feature to redis...")
userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user") userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
...@@ -686,4 +681,14 @@ if __name__ == '__main__': ...@@ -686,4 +681,14 @@ if __name__ == '__main__':
timestmp7 = int(round(time.time())) timestmp7 = int(round(time.time()))
print("总耗时s:{}".format(timestmp7 - timestmp6)) print("总耗时s:{}".format(timestmp7 - timestmp6))
# 离散数据字典生成
print("数据字典生成...")
dataVocab = getDataVocab(samplesWithUserFeatures)
timestmp8 = int(round(time.time()))
print("数据字典生成 耗时s:{}".format(timestmp8 - timestmp7))
# 字典转为json 存入redis
print("数据字典存入redis...")
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
dataVocabToRedis(dataVocabStr)
spark.stop() spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment