Commit ef366e48 authored by 郭羽's avatar 郭羽

service model 优化

parent 59664d37
......@@ -81,7 +81,46 @@ def priceToBucket(num):
numberToBucketUdf = F.udf(numberToBucket, StringType())
priceToBucketUdf = F.udf(priceToBucket, StringType())
def addStaticsFeatures(samples,dataVocab):
def addItemStaticFeatures(samples,itemDF,dataVocab):
print("item统计特征处理...")
staticFeatures = samples.groupBy('item_id').agg(F.count(F.lit(1)).alias('itemRatingCount'),
F.avg(F.col('rating')).alias('itemRatingAvg'),
F.stddev(F.col('rating')).alias('itemRatingStddev'),
F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).alias("itemClickCount"),
F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).alias("itemExpCount")
).fillna(0) \
.withColumn('itemRatingStddev', F.format_number(F.col('itemRatingStddev'), NUMBER_PRECISION).cast("float")) \
.withColumn('itemRatingAvg', F.format_number(F.col('itemRatingAvg'), NUMBER_PRECISION).cast("float")) \
.withColumn('itemCtr',F.format_number(F.col("itemClickCount") / (F.col("itemExpCount") + 1), NUMBER_PRECISION).cast("float"))
staticFeatures.show(20, truncate=False)
staticFeatures = itemDF.join(itemStaticDF, on=["item_id"], how='left')
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ["itemRatingCount","itemRatingAvg", "itemClickCount", "itemExpCount"]:
new_col = col + bucket_suffix
staticFeatures = staticFeatures.withColumn(new_col, numberToBucketUdf(F.col(col))) \
.drop(col) \
.withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col)))
dataVocab[new_col] = bucket_vocab
# 方差处理
number_suffix = "_number"
for col in ["itemRatingStddev"]:
new_col = col + number_suffix
staticFeatures = staticFeatures.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
for col in ["itemCtr"]:
new_col = col + number_suffix
staticFeatures = staticFeatures.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col,new_col)
print("item size:", staticFeatures.count())
return staticFeatures
def addUserStaticsFeatures(samples,dataVocab):
print("user统计特征处理...")
samples = samples \
.withColumn('userRatingCount',F.format_number(F.sum(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
......@@ -92,21 +131,14 @@ def addStaticsFeatures(samples,dataVocab):
.withColumn("userCtr", F.format_number(F.col("userClickCount")/(F.col("userExpCount")+1),NUMBER_PRECISION).cast("float")) \
.filter(F.col("userRatingCount") > 1)
print("item统计特征处理...")
samples = samples \
.withColumn('itemRatingCount',F.format_number(F.sum(F.lit(1)).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn("itemRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn("itemRatingStddev", F.format_number(F.stddev(F.col("rating")).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("itemClickCount", F.format_number(F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).over(sql.Window.partitionBy("item_id").orderBy(F.col("timestamp")).rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("itemExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over(sql.Window.partitionBy("item_id").orderBy(F.col("timestamp")).rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("itemCtr", F.format_number(F.col("itemClickCount")/(F.col("itemExpCount")+1),NUMBER_PRECISION).cast("float")) \
samples.show(20, truncate=False)
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ["userRatingCount","userRatingAvg","userClickCount","userExpCount","itemRatingCount","itemRatingAvg","itemClickCount","itemExpCount"]:
for col in ["userRatingCount","userRatingAvg","userClickCount","userExpCount"]:
new_col = col + bucket_suffix
samples = samples.withColumn(new_col, numberToBucketUdf(F.col(col)))\
.drop(col)\
......@@ -115,10 +147,10 @@ def addStaticsFeatures(samples,dataVocab):
# 方差处理
number_suffix = "_number"
for col in ["userRatingStddev","itemRatingStddev"]:
for col in ["userRatingStddev"]:
new_col = col + number_suffix
samples = samples.withColumn(new_col,F.when(F.col(col).isNull(),0).otherwise(1/(F.col(col)+1))).drop(col)
for col in ["userCtr", "itemCtr"]:
for col in ["userCtr"]:
new_col = col + number_suffix
samples = samples.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col, new_col)
......@@ -201,6 +233,7 @@ def addUserFeatures(samples,dataVocab,multiVocab):
samples = samples.drop("userPositiveHistory")
# user偏好
print("user 偏好数据")
for c,v in multiVocab.items():
new_col = "user" + "__" + c
samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
......@@ -210,6 +243,51 @@ def addUserFeatures(samples,dataVocab,multiVocab):
samples = samples.drop(new_col).drop(c)
print("user统计特征处理...")
samples = samples \
.withColumn('userRatingCount', F.format_number(
F.sum(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingAvg", F.format_number(
F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingStddev", F.format_number(
F.stddev(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userClickCount", F.format_number(
F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).over(
sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
"float")) \
.withColumn("userExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over(
sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
"float")) \
.withColumn("userCtr",
F.format_number(F.col("userClickCount") / (F.col("userExpCount") + 1), NUMBER_PRECISION).cast(
"float")) \
.filter(F.col("userRatingCount") > 1)
samples.show(10, truncate=False)
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ["userRatingCount", "userRatingAvg", "userClickCount", "userExpCount"]:
new_col = col + bucket_suffix
samples = samples.withColumn(new_col, numberToBucketUdf(F.col(col))) \
.drop(col) \
.withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col)))
dataVocab[new_col] = bucket_vocab
# 方差处理
number_suffix = "_number"
for col in ["userRatingStddev"]:
new_col = col + number_suffix
samples = samples.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
for col in ["userCtr"]:
new_col = col + number_suffix
samples = samples.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col,
new_col)
samples.printSchema()
samples.show(10,truncate=False)
return samples
......@@ -302,7 +380,7 @@ def userFeaturesToRedis(samples,columns,prefix,redisKey):
print(prefix, resDatas.count())
resDatas.repartition(8).foreachPartition(toRedis)
def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
def itemFeaturesToRedis(itemStaticDF,redisKey):
idCol = "item_id"
timestampCol = "item_timestamp"
......@@ -315,28 +393,29 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7)
item_static_columns = [idCol] + ["itemRatingCount_Bucket", "itemRatingAvg_Bucket", "itemClickCount_Bucket", "itemExpCount_Bucket","itemRatingStddev_number","itemCtr_number"]
# item_static_columns = [idCol] + ["itemRatingCount_Bucket", "itemRatingAvg_Bucket", "itemClickCount_Bucket", "itemExpCount_Bucket","itemRatingStddev_number","itemCtr_number"]
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
item_static_df = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
item_static_df = item_static_df.select(*item_static_columns)
item_static_df.show(10,truncate=False)
resDatas = itemDF.join(item_static_df, on=[idCol], how='left')
for col in item_static_columns:
res = "0"
if col.endswith("Bucket"):
res = "0"
if col.endswith("_number"):
res = 0
resDatas = resDatas.withColumn(col,F.when(F.col(col).isNull(), res).otherwise(F.col(col)))
resDatas.show(10,truncate=False)
# prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
# item_static_df = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
# item_static_df = item_static_df.select(*item_static_columns)
# item_static_df.show(10,truncate=False)
resDatas = resDatas.select(*columns).distinct()
print("item size:",resDatas.count())
resDatas.repartition(8).foreachPartition(toRedis)
# resDatas = itemDF.join(itemStaticDF, on=[idCol], how='left')
# item_static_columns = itemStaticDF.columns
#
# for col in item_static_columns:
# res = "0"
# if col.endswith("Bucket"):
# res = "0"
# if col.endswith("_number"):
# res = 0
# resDatas = resDatas.withColumn(col,F.when(F.col(col).isNull(), res).otherwise(F.col(col)))
#
# resDatas.show(10,truncate=False)
#
# resDatas = resDatas.select(*columns)
# print("item size:",resDatas.count())
itemStaticDF.repartition(8).foreachPartition(toRedis)
"""
数据加载
......@@ -801,8 +880,11 @@ if __name__ == '__main__':
itemDF_spark.printSchema()
itemDF_spark.show(10, truncate=False)
#user统计特征处理
itemStaticDF = addItemStaticFeatures(ratingSamplesWithLabel,itemDF_spark,dataVocab)
# 统计数据处理
ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
# ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
samples = ratingSamplesWithLabel.join(itemDF_spark, on=['item_id'], how='inner')
......@@ -815,7 +897,7 @@ if __name__ == '__main__':
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
print("collect feature for user:{}".format(str(user_columns)))
# item columns
item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
item_columns = addItemStaticFeatures.columns
print("collect feature for item:{}".format(str(item_columns)))
# model columns
print("model columns to redis...")
......@@ -828,16 +910,16 @@ if __name__ == '__main__':
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
# item特征数据存入redis
itemFeaturesToRedis(itemStaticDF, FEATURE_ITEM_KEY)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp3))
"""特征数据存入redis======================================"""
# user特征数据存入redis
userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp3))
# item特征数据存入redis
itemFeaturesToRedis(samplesWithUserFeatures,itemDF_spark,item_columns, FEATURE_ITEM_KEY)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp6))
"""训练数据保存 ======================================"""
timestmp3 = int(round(time.time()))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment