Commit 3203f761 authored by 郭羽's avatar 郭羽

美购精排模型耗时优化

parent e5cccd39
......@@ -6,4 +6,4 @@ pythonFile=${path}/spark/featureEng.py
#log_file=~/${content_type}_feature_csv_export.log
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_v1_train
/opt/hadoop/bin/hdfs dfs -rmr /${content_type}_feature_v1_test
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 8 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 8g --executor-memory 2g --executor-cores 1 --num-executors 8 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
......@@ -91,6 +91,7 @@ def addItemFeatures(samples,itemDF):
samples = samples.join(itemDF, on=['itemid'], how='inner')
# 统计特征处理
print("统计特征处理...")
staticFeatures = samples.groupBy('itemid').agg(F.count(F.lit(1)).alias('itemRatingCount'),
F.avg(F.col('rating')).alias('itemRatingAvg'),
F.stddev(F.col('rating')).alias('itemRatingStddev')).fillna(0) \
......@@ -100,6 +101,7 @@ def addItemFeatures(samples,itemDF):
# join item rating features
samples = samples.join(staticFeatures, on=['itemid'], how='left')
print("连续特征处理...")
# 连续特征处理
pipelineStage = []
# Normalization
......@@ -119,7 +121,7 @@ def addItemFeatures(samples,itemDF):
samples = samples.withColumn(c + "Bucket",F.col(c + "Bucket").cast("string"))
samples.printSchema()
samples.show(5, truncate=False)
# samples.show(5, truncate=False)
return samples
......@@ -140,6 +142,7 @@ def addUserFeatures(samples):
extractTagsUdf = F.udf(extractTags, ArrayType(StringType()))
arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType()))
samples = samples.withColumnRenamed("cl_id","userid")
print("user历史数据处理...")
# user历史记录
samples = samples\
.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('itemid')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
......@@ -149,6 +152,7 @@ def addUserFeatures(samples):
samples = samples.drop("userPositiveHistory")
# user历史点击分值统计
print("统计特征处理...")
samples = samples\
.withColumn('userRatingCount',F.count(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))) \
.withColumn("userRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
......@@ -164,14 +168,6 @@ def addUserFeatures(samples):
samples = samples.drop(new_col)
# .drop(c).drop(new_col)
# tags
# c = "tags_v3"
# new_col = "user" + "__" + c
# samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
# for i in range(1,10):
# samples = samples.withColumn(new_col+"__"+str(i), F.when(F.col(new_col)[i-1].isNotNull(),F.col(new_col)[i-1]).otherwise("-1"))
# samples = samples.drop(new_col)
pipelineStage = []
# Normalization
......@@ -190,7 +186,7 @@ def addUserFeatures(samples):
samples = samples.withColumn(c + "Bucket", F.col(c + "Bucket").cast("string"))
samples.printSchema()
samples.show(5,truncate=False)
# samples.show(5,truncate=False)
return samples
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment