Commit 9bd1b0ef authored by 郭羽's avatar 郭羽

service model 优化

parent 0cb723f3
...@@ -384,8 +384,6 @@ def userFeaturesToRedis(samples,columns,prefix,redisKey): ...@@ -384,8 +384,6 @@ def userFeaturesToRedis(samples,columns,prefix,redisKey):
def itemFeaturesToRedis(itemStaticDF,redisKey): def itemFeaturesToRedis(itemStaticDF,redisKey):
idCol = "item_id" idCol = "item_id"
timestampCol = "item_timestamp"
def toRedis(datas): def toRedis(datas):
conn = getRedisConn() conn = getRedisConn()
for d in datas: for d in datas:
...@@ -394,29 +392,6 @@ def itemFeaturesToRedis(itemStaticDF,redisKey): ...@@ -394,29 +392,6 @@ def itemFeaturesToRedis(itemStaticDF,redisKey):
newKey = redisKey + k newKey = redisKey + k
conn.set(newKey, v) conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7) conn.expire(newKey, 60 * 60 * 24 * 7)
# item_static_columns = [idCol] + ["itemRatingCount_Bucket", "itemRatingAvg_Bucket", "itemClickCount_Bucket", "itemExpCount_Bucket","itemRatingStddev_number","itemCtr_number"]
#根据timestamp获取每个user最新的记录
# prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
# item_static_df = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
# item_static_df = item_static_df.select(*item_static_columns)
# item_static_df.show(10,truncate=False)
# resDatas = itemDF.join(itemStaticDF, on=[idCol], how='left')
# item_static_columns = itemStaticDF.columns
#
# for col in item_static_columns:
# res = "0"
# if col.endswith("Bucket"):
# res = "0"
# if col.endswith("_number"):
# res = 0
# resDatas = resDatas.withColumn(col,F.when(F.col(col).isNull(), res).otherwise(F.col(col)))
#
# resDatas.show(10,truncate=False)
#
# resDatas = resDatas.select(*columns)
# print("item size:",resDatas.count())
itemStaticDF.repartition(8).foreachPartition(toRedis) itemStaticDF.repartition(8).foreachPartition(toRedis)
""" """
...@@ -840,7 +815,8 @@ if __name__ == '__main__': ...@@ -840,7 +815,8 @@ if __name__ == '__main__':
.withColumnRenamed("card_id", "item_id")\ .withColumnRenamed("card_id", "item_id")\
.withColumnRenamed("page_stay", "rating")\ .withColumnRenamed("page_stay", "rating")\
.withColumnRenamed("os", "user_os")\ .withColumnRenamed("os", "user_os")\
.withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id"))) .withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\
.withColumn("timestamp",F.col("timestamp").cast("long"))
print(ratingDF.columns) print(ratingDF.columns)
print(ratingDF.show(10, truncate=False)) print(ratingDF.show(10, truncate=False))
...@@ -899,7 +875,7 @@ if __name__ == '__main__': ...@@ -899,7 +875,7 @@ if __name__ == '__main__':
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")] user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
print("collect feature for user:{}".format(str(user_columns))) print("collect feature for user:{}".format(str(user_columns)))
# item columns # item columns
item_columns = itemStaticDF.columns item_columns = [c for c in itemStaticDF.columns if c.startswith("item")]
print("collect feature for item:{}".format(str(item_columns))) print("collect feature for item:{}".format(str(item_columns)))
# model columns # model columns
print("model columns to redis...") print("model columns to redis...")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment