Commit d17ad197 authored by 郭羽's avatar 郭羽

美购精排模型

parent d518cb5c
path=/srv/apps/serviceRec
day_count=$1
content_type="service"
source /srv/envs/serviceRec/bin/activate
pythonFile=${path}/spark/featureEng.py
#log_file=~/${content_type}_feature_csv_export.log
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_train
......
......@@ -128,14 +128,19 @@ def extractTags(genres_list):
sortedGenres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
return [x[0] for x in sortedGenres]
# sql版本不支持F.reverse
def arrayReverse(arr):
arr.reverse()
return arr
def addUserFeatures(samples):
extractTagsUdf = F.udf(extractTags, ArrayType(StringType()))
arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType()))
samples = samples.withColumnRenamed("cl_id","userid")
# user历史记录
samples = samples\
.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('itemid')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
.withColumn("userPositiveHistory", F.reverse(F.col("userPositiveHistory")))
.withColumn("userPositiveHistory", arrayReverseUdf(F.col("userPositiveHistory")))
for i in range(1,11):
samples = samples.withColumn("userRatedHistory"+str(i), F.when(F.col("userPositiveHistory")[i-1].isNotNull(),F.col("userPositiveHistory")[i-1]).otherwise("-1"))
samples = samples.drop("userPositiveHistory")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment