From a68621529fb62e7aa34c339340d3930ac41a85a7 Mon Sep 17 00:00:00 2001
From: "songke@igengmei.com" <songke@igengmei.com>
Date: Wed, 15 Dec 2021 17:12:01 +0800
Subject: [PATCH] =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E8=B0=83=E8=AF=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 spark/featureEngSk.py | 40 ++++++++++++++++++++++++++++------------
 1 file changed, 28 insertions(+), 12 deletions(-)

diff --git a/spark/featureEngSk.py b/spark/featureEngSk.py
index 7f9ab03..07533ec 100644
--- a/spark/featureEngSk.py
+++ b/spark/featureEngSk.py
@@ -43,6 +43,7 @@ FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION
 ITEM_PREFIX = "ITEM_"
 USER_PREFIX = "USER_"
 CATEGORY_PREFIX = "CATEGORY_"
+MULTI_CATEGORY_PREFIX = "MULTI_CATEGORY_"
 NUMERIC_PREFIX = "NUMERIC_"
 DATA_PATH_TRAIN = "/data/files/service_feature_{}_train.csv".format(VERSION)
 
@@ -221,13 +222,17 @@ def itemEsFeaturesProcess(itemDF, spark):
     item_es_feature_start_time = int(round(time.time()))
 
     item_categoty_cols = ['id', 'service_type', 'merchant_id', 'doctor_type', 'doctor_id',
-                 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type', 'hospital_is_high_quality',
-                 'tags_v3', 'second_demands', 'second_solutions', 'second_positions']
+                 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type', 'hospital_is_high_quality']
+    item_multi_categots_cols =['tags_v3', 'second_demands', 'second_solutions', 'second_positions']
 
     for item_categoty_col in item_categoty_cols:
         itemDF[ITEM_PREFIX + CATEGORY_PREFIX + item_categoty_col] = itemDF[item_categoty_col]
     itemDF = itemDF.drop(columns = item_categoty_cols)
 
+    for item_multi_categots_col in item_multi_categots_cols:
+        itemDF[ITEM_PREFIX + MULTI_CATEGORY_PREFIX + item_multi_categots_col] = itemDF[item_multi_categots_col]
+    itemDF = itemDF.drop(columns = item_multi_categots_cols)
+
     item_numeric_cols = ['case_count', 'sales_count', 'discount', 'sku_price']
     for item_numeric_col in item_numeric_cols:
         itemDF[ITEM_PREFIX + NUMERIC_PREFIX + item_numeric_col] = itemDF[item_numeric_col]
@@ -294,12 +299,10 @@ def getUserProfileFeature(spark, startDay, endDay):
 
     addOneDay_UDF = F.udf(addOneDay, StringType())
     userProfileFeatureDF = userProfileFeatureDF.withColumn('partition_date', addOneDay_UDF('dt'))\
-        .withColumnRenamed("os", USER_PREFIX + CATEGORY_PREFIX + "os")\
-        .withColumnRenamed("user_city_id", USER_PREFIX + CATEGORY_PREFIX + "user_city_id")\
-        .withColumnRenamed("second_solutions", USER_PREFIX + CATEGORY_PREFIX + "second_solutions")\
-        .withColumnRenamed("second_demands", USER_PREFIX + CATEGORY_PREFIX + "second_demands")\
-        .withColumnRenamed("second_positions", USER_PREFIX + CATEGORY_PREFIX + "second_positions")\
-        .withColumnRenamed("projects", USER_PREFIX + CATEGORY_PREFIX + "projects")\
+        .withColumnRenamed("second_solutions", USER_PREFIX + MULTI_CATEGORY_PREFIX + "second_solutions")\
+        .withColumnRenamed("second_demands", USER_PREFIX + MULTI_CATEGORY_PREFIX + "second_demands")\
+        .withColumnRenamed("second_positions", USER_PREFIX + MULTI_CATEGORY_PREFIX + "second_positions")\
+        .withColumnRenamed("projects", USER_PREFIX + MULTI_CATEGORY_PREFIX + "projects")\
         .drop('dt')
     userProfileFeatureDF.cache()
     userProfileFeatureDF.show(20, False)
@@ -751,7 +754,7 @@ def init_es_query():
         },
         "query": {
             "bool": {
-                "must": [{"term": {"is_online": True}}],
+                "must": [],
                 "must_not": [],
                 "should": []
             }
@@ -955,7 +958,9 @@ if __name__ == '__main__':
         .join(expStaticFeatures, on = ["card_id", "partition_date"], how = 'left')\
         .join(itemEsFeatureDF, on = ["card_id"], how = 'left')
     samples = samples.withColumnRenamed("card_id", ITEM_PREFIX + CATEGORY_PREFIX + "card_id")\
-        .withColumnRenamed("device_id", USER_PREFIX + CATEGORY_PREFIX + "device_id")\
+        .withColumnRenamed("device_id", USER_PREFIX + CATEGORY_PREFIX + "device_id") \
+        .withColumnRenamed("os", USER_PREFIX + CATEGORY_PREFIX + "os") \
+        .withColumnRenamed("user_city_id", USER_PREFIX + CATEGORY_PREFIX + "user_city_id") \
         .drop("partition_date", "timestamp")
 
     # | -- card_id: string(nullable=true)
@@ -993,8 +998,19 @@ if __name__ == '__main__':
     # | -- ITEM_CATEGORY_projects: string(nullable=true)
     # | -- ITEM_NUMERIC_sku_price: double(nullable=true)
     #
-    print(samples.schema.fields)
-    print([field.name for field in samples.schema.fields])
+    fields = [field.name for field in samples.schema.fields]
+
+    fields_na_value_dict = {}
+    for field in fields:
+        if field.startswith(ITEM_PREFIX + CATEGORY_PREFIX) or field.startswith(USER_PREFIX + CATEGORY_PREFIX):
+            fields_na_value_dict[field] = '-1'
+        elif field.startswith(ITEM_PREFIX + MULTI_CATEGORY_PREFIX) or field.startswith(USER_PREFIX + MULTI_CATEGORY_PREFIX):
+            fields_na_value_dict[field] = ['-1']
+        elif field.startswith(ITEM_PREFIX + NUMERIC_PREFIX) or field.startswith(USER_PREFIX + NUMERIC_PREFIX):
+            fields_na_value_dict[field] = 0
+
+    samples.na.fill(fields_na_value_dict)
+
     samples.printSchema()
     samples.show(20, False)
     sys.exit()
-- 
2.18.0