Commit 67de0eae authored by 郭羽's avatar 郭羽

特征工程优化

parent fcfb12fe
......@@ -66,12 +66,16 @@ TRAIN_FILE_PATH = "service_feature_" + VERSION
ITEM_PREFIX = "item_"
def addItemFeatures(samples,itemDF):
def addItemFeatures(samples,itemDF,dataVocab,multiVocab):
itemDF = itemDF.withColumnRenamed("id", "itemid")
# 数据过滤:无医生
itemDF = itemDF.filter(col("doctor_id") != "-1")
# itemid
vocabList = collectColumnToVocab(itemDF, "itemid")
dataVocab["itemid"] = vocabList
# null处理
for c in ITEM_NUMBER_COLUMNS:
print("null count:",c,itemDF.filter(col(c).isNull()).count())
......@@ -80,16 +84,24 @@ def addItemFeatures(samples,itemDF):
for c in ITEM_CATE_COLUMNS:
print("null count:", c, itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(ITEM_PREFIX+c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))).drop(c)
# 字典添加
dataVocab[ITEM_PREFIX+c] = collectColumnToVocab(itemDF,ITEM_PREFIX+c)
# 离散特征处理
for c, v in ITEM_MULTI_COLUMN_EXTRA_MAP.items():
print("null count:", c, itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c)))
multiVocab[c] = collectMutiColumnToVocab(itemDF, c)
itemDF = itemDF.drop(c)
for i in range(1, v + 1):
new_c = ITEM_PREFIX + c + "__" + str(i)
itemDF = itemDF.withColumn(new_c, F.split(F.col(c), ",")[i - 1])
itemDF = itemDF.withColumn(new_c, F.when(F.col(new_c).isNull(), "-1").otherwise(F.col(new_c)))
dataVocab[new_c] = multiVocab[c]
samples = samples.join(itemDF, on=['itemid'], how='inner')
# 统计特征处理
print("统计特征处理...")
......@@ -121,6 +133,9 @@ def addItemFeatures(samples,itemDF):
for c in bucketColumns:
samples = samples.withColumn(c + "Bucket",F.col(c + "Bucket").cast("string"))
dataVocab[c + "Bucket"] = [str(float(i)) for i in range(11)]
samples.printSchema()
# samples.show(5, truncate=False)
......@@ -139,7 +154,9 @@ def arrayReverse(arr):
arr.reverse()
return arr
def addUserFeatures(samples):
def addUserFeatures(samples,dataVocab,multiVocab):
dataVocab["userid"] = collectColumnToVocab(samples, "userid")
extractTagsUdf = F.udf(extractTags, ArrayType(StringType()))
arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType()))
samples = samples.withColumnRenamed("cl_id","userid")
......@@ -150,6 +167,7 @@ def addUserFeatures(samples):
.withColumn("userPositiveHistory", arrayReverseUdf(F.col("userPositiveHistory")))
for i in range(1,11):
samples = samples.withColumn("userRatedHistory"+str(i), F.when(F.col("userPositiveHistory")[i-1].isNotNull(),F.col("userPositiveHistory")[i-1]).otherwise("-1"))
# dataVocab["userRatedHistory"+str(i)] = dataVocab["itemid"]
samples = samples.drop("userPositiveHistory")
# user历史点击分值统计
......@@ -166,6 +184,9 @@ def addUserFeatures(samples):
samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
for i in range(1, v+1):
samples = samples.withColumn(new_col + "__" + str(i),F.when(F.col(new_col)[i - 1].isNotNull(), F.col(new_col)[i - 1]).otherwise("-1"))
dataVocab[new_col + "__" + str(i)] = multiVocab[c]
samples = samples.drop(new_col)
# .drop(c).drop(new_col)
......@@ -185,7 +206,9 @@ def addUserFeatures(samples):
samples = featurePipeline.fit(samples).transform(samples)
# 转string
for c in bucketColumns:
samples = samples.withColumn(c + "Bucket", F.col(c + "Bucket").cast("string"))
samples = samples.withColumn(c + "Bucket", F.col(c + "Bucket").cast("string")).drop(c)
dataVocab[c + "Bucket"] = [str(float(i)) for i in range(11)]
samples.printSchema()
# samples.show(5,truncate=False)
......@@ -224,6 +247,25 @@ def splitAndSaveTrainingTestSamplesByTimeStamp(samples,splitTimestamp, file_path
train.write.option("header", "true").option("delimiter", "|").mode('overwrite').csv(trainingSavePath)
test.write.option("header", "true").option("delimiter", "|").mode('overwrite').csv(testSavePath)
def collectColumnToVocab(samples,column):
datas = samples.select(column).distinct().collect()
vocabSet = set()
for d in datas:
if d[column]:
vocabSet.add(str(d[column]))
vocabSet.add("-1") # 空值的默认
return list(vocabSet)
def collectMutiColumnToVocab(samples,column):
datas = samples.select(column).distinct().collect()
tagSet = set()
for d in datas:
if d[column]:
for tag in d[column].split(","):
tagSet.add(tag)
tagSet.add("-1") # 空值默认
return list(tagSet)
def getDataVocab(samples,model_columns):
dataVocab = {}
......@@ -232,28 +274,13 @@ def getDataVocab(samples,model_columns):
# 多值特征
for c in ITEM_MULTI_COLUMN_EXTRA_MAP.keys():
print(c)
datas = samples.select(c).distinct().collect()
tagSet = set()
for d in datas:
if d[c]:
for tag in d[c].split(","):
tagSet.add(tag)
tagSet.add("-1") #空值默认
multiVocab[c] = list(tagSet)
multiVocab[c] = collectMutiColumnToVocab(samples,c)
samples = samples.drop(c)
# id类特征 和 类别特征
for c in ["userid","itemid"] + [ITEM_PREFIX + c for c in ITEM_CATE_COLUMNS]:
for c in ["userid"]:
print(c)
datas = samples.select(c).distinct().collect()
vocabSet = set()
for d in datas:
if d[c]:
vocabSet.add(str(d[c]))
vocabSet.add("-1") # 空值的默认
dataVocab[c] = list(vocabSet)
pass
dataVocab[c] = collectColumnToVocab(samples,c)
for c in model_columns:
# 判断是否以Bucket结尾
......@@ -758,14 +785,20 @@ if __name__ == '__main__':
print("添加label...")
ratingSamplesWithLabel = addSampleLabel(ratingDF)
# 数据字典
dataVocab = {}
multiVocab = {}
print("处理item特征...")
timestmp1 = int(round(time.time()))
samplesWithItemFeatures = addItemFeatures(ratingSamplesWithLabel, itemDF)
samplesWithItemFeatures = addItemFeatures(ratingSamplesWithLabel, itemDF, dataVocab,multiVocab)
timestmp2 = int(round(time.time()))
print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
print("multiVocab:")
print(multiVocab.keys())
print("处理user特征...")
samplesWithUserFeatures = addUserFeatures(samplesWithItemFeatures)
samplesWithUserFeatures = addUserFeatures(samplesWithItemFeatures,dataVocab,multiVocab)
timestmp3 = int(round(time.time()))
print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2))
......@@ -787,6 +820,7 @@ if __name__ == '__main__':
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp4))
# item特征数据存入redis
# todo 添加最近一个月有行为的item,待优化:扩大item范围
# featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
print("item feature to collect 耗时s:{}".format(int(round(time.time())) - timestmp5))
......@@ -810,15 +844,18 @@ if __name__ == '__main__':
print("数据写入hdfs 耗时s:{}".format(timestmp7 - timestmp6))
# 离散数据字典生成
print("数据字典生成...")
dataVocab = getDataVocab(samplesWithUserFeatures,model_columns)
timestmp8 = int(round(time.time()))
print("数据字典生成 耗时s:{}".format(timestmp8 - timestmp7))
# print("数据字典生成...")
# dataVocab = getDataVocab(samplesWithUserFeatures,model_columns,dataVocab)
# timestmp8 = int(round(time.time()))
# print("数据字典生成 耗时s:{}".format(timestmp8 - timestmp7))
# 字典转为json 存入redis
print("数据字典存入redis...")
print("dataVocab:")
print(dataVocab.keys())
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
dataVocabToRedis(dataVocabStr)
timestmp9 = int(round(time.time()))
print("总耗时s:{}".format(timestmp9 - timestmp8))
print("总耗时s:{}".format(timestmp9 - timestmp7))
spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment