From ef366e48cf4e0c86b980b7401ab83b144b6e0f01 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=83=AD=E7=BE=BD?= <guoyu@igengmei.com>
Date: Fri, 30 Jul 2021 19:41:17 +0800
Subject: [PATCH] =?UTF-8?q?service=20model=20=E4=BC=98=E5=8C=96?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 spark/featureEng2.py | 164 ++++++++++++++++++++++++++++++++-----------
 1 file changed, 123 insertions(+), 41 deletions(-)

diff --git a/spark/featureEng2.py b/spark/featureEng2.py
index 0acd938..1124452 100644
--- a/spark/featureEng2.py
+++ b/spark/featureEng2.py
@@ -81,7 +81,46 @@ def priceToBucket(num):
 numberToBucketUdf = F.udf(numberToBucket, StringType())
 priceToBucketUdf = F.udf(priceToBucket, StringType())
 
-def addStaticsFeatures(samples,dataVocab):
+def addItemStaticFeatures(samples,itemDF,dataVocab):
+    print("item统计特征处理...")
+    staticFeatures = samples.groupBy('item_id').agg(F.count(F.lit(1)).alias('itemRatingCount'),
+                                                    F.avg(F.col('rating')).alias('itemRatingAvg'),
+                                                    F.stddev(F.col('rating')).alias('itemRatingStddev'),
+                                                    F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).alias("itemClickCount"),
+                                                    F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).alias("itemExpCount")
+                                                    ).fillna(0) \
+        .withColumn('itemRatingStddev', F.format_number(F.col('itemRatingStddev'), NUMBER_PRECISION).cast("float")) \
+        .withColumn('itemRatingAvg', F.format_number(F.col('itemRatingAvg'), NUMBER_PRECISION).cast("float")) \
+        .withColumn('itemCtr',F.format_number(F.col("itemClickCount") / (F.col("itemExpCount") + 1), NUMBER_PRECISION).cast("float"))
+
+    staticFeatures.show(20, truncate=False)
+
+    staticFeatures = itemDF.join(itemStaticDF, on=["item_id"], how='left')
+
+    # 连续特征分桶
+    bucket_vocab = [str(i) for i in range(101)]
+    bucket_suffix = "_Bucket"
+    for col in ["itemRatingCount","itemRatingAvg", "itemClickCount", "itemExpCount"]:
+        new_col = col + bucket_suffix
+        staticFeatures = staticFeatures.withColumn(new_col, numberToBucketUdf(F.col(col))) \
+            .drop(col) \
+            .withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col)))
+        dataVocab[new_col] = bucket_vocab
+
+    # 方差处理
+    number_suffix = "_number"
+    for col in ["itemRatingStddev"]:
+        new_col = col + number_suffix
+        staticFeatures = staticFeatures.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
+    for col in ["itemCtr"]:
+        new_col = col + number_suffix
+        staticFeatures = staticFeatures.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col,new_col)
+
+    print("item size:", staticFeatures.count())
+
+    return staticFeatures
+
+def addUserStaticsFeatures(samples,dataVocab):
     print("user统计特征处理...")
     samples = samples \
         .withColumn('userRatingCount',F.format_number(F.sum(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
@@ -92,21 +131,14 @@ def addStaticsFeatures(samples,dataVocab):
         .withColumn("userCtr", F.format_number(F.col("userClickCount")/(F.col("userExpCount")+1),NUMBER_PRECISION).cast("float")) \
         .filter(F.col("userRatingCount") > 1)
 
-    print("item统计特征处理...")
-    samples = samples \
-        .withColumn('itemRatingCount',F.format_number(F.sum(F.lit(1)).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
-        .withColumn("itemRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
-        .withColumn("itemRatingStddev", F.format_number(F.stddev(F.col("rating")).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
-        .withColumn("itemClickCount", F.format_number(F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).over(sql.Window.partitionBy("item_id").orderBy(F.col("timestamp")).rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
-        .withColumn("itemExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over(sql.Window.partitionBy("item_id").orderBy(F.col("timestamp")).rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
-        .withColumn("itemCtr", F.format_number(F.col("itemClickCount")/(F.col("itemExpCount")+1),NUMBER_PRECISION).cast("float")) \
 
     samples.show(20, truncate=False)
 
+
     # 连续特征分桶
     bucket_vocab = [str(i) for i in range(101)]
     bucket_suffix = "_Bucket"
-    for col in ["userRatingCount","userRatingAvg","userClickCount","userExpCount","itemRatingCount","itemRatingAvg","itemClickCount","itemExpCount"]:
+    for col in ["userRatingCount","userRatingAvg","userClickCount","userExpCount"]:
         new_col = col + bucket_suffix
         samples = samples.withColumn(new_col, numberToBucketUdf(F.col(col)))\
             .drop(col)\
@@ -115,10 +147,10 @@ def addStaticsFeatures(samples,dataVocab):
 
     # 方差处理
     number_suffix = "_number"
-    for col in ["userRatingStddev","itemRatingStddev"]:
+    for col in ["userRatingStddev"]:
         new_col = col + number_suffix
         samples = samples.withColumn(new_col,F.when(F.col(col).isNull(),0).otherwise(1/(F.col(col)+1))).drop(col)
-    for col in ["userCtr", "itemCtr"]:
+    for col in ["userCtr"]:
         new_col = col + number_suffix
         samples = samples.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col, new_col)
 
@@ -201,6 +233,7 @@ def addUserFeatures(samples,dataVocab,multiVocab):
     samples = samples.drop("userPositiveHistory")
 
     # user偏好
+    print("user 偏好数据")
     for c,v in multiVocab.items():
         new_col = "user" + "__" + c
         samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
@@ -210,6 +243,51 @@ def addUserFeatures(samples,dataVocab,multiVocab):
 
         samples = samples.drop(new_col).drop(c)
 
+    print("user统计特征处理...")
+    samples = samples \
+        .withColumn('userRatingCount', F.format_number(
+        F.sum(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
+        NUMBER_PRECISION).cast("float")) \
+        .withColumn("userRatingAvg", F.format_number(
+        F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
+        NUMBER_PRECISION).cast("float")) \
+        .withColumn("userRatingStddev", F.format_number(
+        F.stddev(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
+        NUMBER_PRECISION).cast("float")) \
+        .withColumn("userClickCount", F.format_number(
+        F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).over(
+            sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
+        "float")) \
+        .withColumn("userExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over(
+        sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
+        "float")) \
+        .withColumn("userCtr",
+                    F.format_number(F.col("userClickCount") / (F.col("userExpCount") + 1), NUMBER_PRECISION).cast(
+                        "float")) \
+        .filter(F.col("userRatingCount") > 1)
+
+    samples.show(10, truncate=False)
+
+    # 连续特征分桶
+    bucket_vocab = [str(i) for i in range(101)]
+    bucket_suffix = "_Bucket"
+    for col in ["userRatingCount", "userRatingAvg", "userClickCount", "userExpCount"]:
+        new_col = col + bucket_suffix
+        samples = samples.withColumn(new_col, numberToBucketUdf(F.col(col))) \
+            .drop(col) \
+            .withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col)))
+        dataVocab[new_col] = bucket_vocab
+
+    # 方差处理
+    number_suffix = "_number"
+    for col in ["userRatingStddev"]:
+        new_col = col + number_suffix
+        samples = samples.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
+    for col in ["userCtr"]:
+        new_col = col + number_suffix
+        samples = samples.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col,
+                                                                                                                  new_col)
+
     samples.printSchema()
     samples.show(10,truncate=False)
     return samples
@@ -302,7 +380,7 @@ def userFeaturesToRedis(samples,columns,prefix,redisKey):
     print(prefix, resDatas.count())
     resDatas.repartition(8).foreachPartition(toRedis)
 
-def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
+def itemFeaturesToRedis(itemStaticDF,redisKey):
     idCol = "item_id"
     timestampCol = "item_timestamp"
 
@@ -315,28 +393,29 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
             conn.set(newKey, v)
             conn.expire(newKey, 60 * 60 * 24 * 7)
 
-    item_static_columns = [idCol] + ["itemRatingCount_Bucket", "itemRatingAvg_Bucket", "itemClickCount_Bucket", "itemExpCount_Bucket","itemRatingStddev_number","itemCtr_number"]
+    # item_static_columns = [idCol] + ["itemRatingCount_Bucket", "itemRatingAvg_Bucket", "itemClickCount_Bucket", "itemExpCount_Bucket","itemRatingStddev_number","itemCtr_number"]
     #根据timestamp获取每个user最新的记录
-    prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
-    item_static_df = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
-    item_static_df = item_static_df.select(*item_static_columns)
-    item_static_df.show(10,truncate=False)
-
-    resDatas = itemDF.join(item_static_df, on=[idCol], how='left')
-
-    for col in item_static_columns:
-        res = "0"
-        if col.endswith("Bucket"):
-            res = "0"
-        if col.endswith("_number"):
-            res = 0
-        resDatas = resDatas.withColumn(col,F.when(F.col(col).isNull(), res).otherwise(F.col(col)))
-
-    resDatas.show(10,truncate=False)
+    # prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
+    # item_static_df = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
+    # item_static_df = item_static_df.select(*item_static_columns)
+    # item_static_df.show(10,truncate=False)
 
-    resDatas = resDatas.select(*columns).distinct()
-    print("item size:",resDatas.count())
-    resDatas.repartition(8).foreachPartition(toRedis)
+    # resDatas = itemDF.join(itemStaticDF, on=[idCol], how='left')
+    # item_static_columns = itemStaticDF.columns
+    #
+    # for col in item_static_columns:
+    #     res = "0"
+    #     if col.endswith("Bucket"):
+    #         res = "0"
+    #     if col.endswith("_number"):
+    #         res = 0
+    #     resDatas = resDatas.withColumn(col,F.when(F.col(col).isNull(), res).otherwise(F.col(col)))
+    #
+    # resDatas.show(10,truncate=False)
+    #
+    # resDatas = resDatas.select(*columns)
+    # print("item size:",resDatas.count())
+    itemStaticDF.repartition(8).foreachPartition(toRedis)
 
 """
     数据加载
@@ -801,8 +880,11 @@ if __name__ == '__main__':
     itemDF_spark.printSchema()
     itemDF_spark.show(10, truncate=False)
 
+    #user统计特征处理
+    itemStaticDF = addItemStaticFeatures(ratingSamplesWithLabel,itemDF_spark,dataVocab)
+
     # 统计数据处理
-    ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
+    # ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
 
     samples = ratingSamplesWithLabel.join(itemDF_spark, on=['item_id'], how='inner')
 
@@ -815,7 +897,7 @@ if __name__ == '__main__':
     user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
     print("collect feature for user:{}".format(str(user_columns)))
     # item columns
-    item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
+    item_columns = addItemStaticFeatures.columns
     print("collect feature for item:{}".format(str(item_columns)))
     # model columns
     print("model columns to redis...")
@@ -828,16 +910,16 @@ if __name__ == '__main__':
     dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
     open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
 
+    # item特征数据存入redis
+    itemFeaturesToRedis(itemStaticDF, FEATURE_ITEM_KEY)
+    timestmp6 = int(round(time.time()))
+    print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp3))
+
     """特征数据存入redis======================================"""
     # user特征数据存入redis
     userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
     timestmp5 = int(round(time.time()))
-    print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp3))
-
-    # item特征数据存入redis
-    itemFeaturesToRedis(samplesWithUserFeatures,itemDF_spark,item_columns, FEATURE_ITEM_KEY)
-    timestmp6 = int(round(time.time()))
-    print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
+    print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp6))
 
     """训练数据保存 ======================================"""
     timestmp3 = int(round(time.time()))
-- 
2.18.0