Commit 1f03ad55 authored by 郭羽's avatar 郭羽

service model 优化

parent dbc3d28f
......@@ -215,7 +215,7 @@ def addUserFeatures(samples,dataVocab,multiVocab):
return samples
def addSampleLabel(ratingSamples):
ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 1, 1).otherwise(0))
ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 5, 1).otherwise(0))
ratingSamples.show(5, truncate=False)
ratingSamples.printSchema()
return ratingSamples
......@@ -296,8 +296,9 @@ def userFeaturesToRedis(samples,columns,prefix,redisKey):
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
resDatas = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
resDatas = resDatas.select(*columns).distinct()
resDatas = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
resDatas = resDatas.select(*columns)
resDatas.show(10,truncate=False)
print(prefix, resDatas.count())
resDatas.repartition(8).foreachPartition(toRedis)
......@@ -317,7 +318,7 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
item_static_columns = [idCol] + ["itemRatingCount_Bucket", "itemRatingAvg_Bucket", "itemClickCount_Bucket", "itemExpCount_Bucket","itemRatingStddev_number","itemCtr_number"]
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
item_static_df = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
item_static_df = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
item_static_df = item_static_df.select(*item_static_columns)
item_static_df.show(10,truncate=False)
......@@ -333,7 +334,7 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
resDatas.show(10,truncate=False)
resDatas = resDatas.select(*columns).distinct()
resDatas = resDatas.select(*columns)
print("item size:",resDatas.count())
resDatas.repartition(8).foreachPartition(toRedis)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment