From d17ad1975cf60ef4988de22af24b3c173cc32f9a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=83=AD=E7=BE=BD?= <guoyu@igengmei.com>
Date: Fri, 28 May 2021 19:11:52 +0800
Subject: [PATCH] =?UTF-8?q?=E7=BE=8E=E8=B4=AD=E7=B2=BE=E6=8E=92=E6=A8=A1?=
 =?UTF-8?q?=E5=9E=8B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 shell/featureeng_export.sh | 1 -
 spark/featureEng.py        | 7 ++++++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/shell/featureeng_export.sh b/shell/featureeng_export.sh
index 48f790d..0ffddd7 100644
--- a/shell/featureeng_export.sh
+++ b/shell/featureeng_export.sh
@@ -1,7 +1,6 @@
 path=/srv/apps/serviceRec
 day_count=$1
 content_type="service"
-source /srv/envs/serviceRec/bin/activate
 pythonFile=${path}/spark/featureEng.py
 #log_file=~/${content_type}_feature_csv_export.log
 /opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_train
diff --git a/spark/featureEng.py b/spark/featureEng.py
index ddf902d..2ff6732 100644
--- a/spark/featureEng.py
+++ b/spark/featureEng.py
@@ -128,14 +128,19 @@ def extractTags(genres_list):
     sortedGenres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
     return [x[0] for x in sortedGenres]
 
+# sql版本不支持F.reverse
+def arrayReverse(arr):
+    arr.reverse()
+    return arr
 
 def addUserFeatures(samples):
     extractTagsUdf = F.udf(extractTags, ArrayType(StringType()))
+    arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType()))
     samples = samples.withColumnRenamed("cl_id","userid")
     # user历史记录
     samples = samples\
         .withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('itemid')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
-        .withColumn("userPositiveHistory", F.reverse(F.col("userPositiveHistory")))
+        .withColumn("userPositiveHistory", arrayReverseUdf(F.col("userPositiveHistory")))
     for i in range(1,11):
         samples = samples.withColumn("userRatedHistory"+str(i), F.when(F.col("userPositiveHistory")[i-1].isNotNull(),F.col("userPositiveHistory")[i-1]).otherwise("-1"))
     samples = samples.drop("userPositiveHistory")
-- 
2.18.0