Commit b0fdcb93 authored by 郭羽's avatar 郭羽

service model 优化

parent 1f03ad55
...@@ -297,7 +297,7 @@ def userFeaturesToRedis(samples,columns,prefix,redisKey): ...@@ -297,7 +297,7 @@ def userFeaturesToRedis(samples,columns,prefix,redisKey):
#根据timestamp获取每个user最新的记录 #根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol)) prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
resDatas = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol)) resDatas = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
resDatas = resDatas.select(*columns) resDatas = resDatas.select(*columns).distinct()
resDatas.show(10,truncate=False) resDatas.show(10,truncate=False)
print(prefix, resDatas.count()) print(prefix, resDatas.count())
resDatas.repartition(8).foreachPartition(toRedis) resDatas.repartition(8).foreachPartition(toRedis)
...@@ -334,7 +334,7 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey): ...@@ -334,7 +334,7 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
resDatas.show(10,truncate=False) resDatas.show(10,truncate=False)
resDatas = resDatas.select(*columns) resDatas = resDatas.select(*columns).distinct()
print("item size:",resDatas.count()) print("item size:",resDatas.count())
resDatas.repartition(8).foreachPartition(toRedis) resDatas.repartition(8).foreachPartition(toRedis)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment