Commit 546e2013 authored by 郭羽's avatar 郭羽

service model 优化

parent 25b34121
...@@ -129,7 +129,9 @@ def addStaticsFeatures(samples,dataVocab): ...@@ -129,7 +129,9 @@ def addStaticsFeatures(samples,dataVocab):
bucket_suffix = "_Bucket" bucket_suffix = "_Bucket"
for col in ["userRatingCount","userRatingAvg","userClickCount","userExpCount","itemRatingCount","itemRatingAvg","itemClickCount","itemExpCount"]: for col in ["userRatingCount","userRatingAvg","userClickCount","userExpCount","itemRatingCount","itemRatingAvg","itemClickCount","itemExpCount"]:
new_col = col + bucket_suffix new_col = col + bucket_suffix
samples = samples.withColumn(new_col, F.when(F.col(col).isNull(),"0").otherwise(numberToBucketUdf(F.col(col)).cast("int").cast("string"))).drop(col) samples = samples.withColumn(new_col, numberToBucketUdf(F.col(col)).cast("int").cast("string"))\
.drop(col)\
.withColumn(new_col,F.when(F.col(new_col).isNull(),"0").otherwise(F.col(new_col)))
dataVocab[new_col] = bucket_vocab dataVocab[new_col] = bucket_vocab
# 方差处理 # 方差处理
...@@ -142,7 +144,7 @@ def addStaticsFeatures(samples,dataVocab): ...@@ -142,7 +144,7 @@ def addStaticsFeatures(samples,dataVocab):
samples = samples.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col, new_col) samples = samples.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col, new_col)
samples.printSchema() samples.printSchema()
samples.show(10, truncate=False) samples.show(20, truncate=False)
return samples return samples
def addItemFeatures(itemDF,dataVocab,multi_col_vocab): def addItemFeatures(itemDF,dataVocab,multi_col_vocab):
...@@ -211,10 +213,8 @@ def addUserFeatures(samples,dataVocab,multiVocab): ...@@ -211,10 +213,8 @@ def addUserFeatures(samples,dataVocab,multiVocab):
print("user历史数据处理...") print("user历史数据处理...")
# user历史记录 # user历史记录
samples = samples.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('item_id')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) samples = samples.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('item_id')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)))
samples.show(10,truncate=False)
samples = samples.withColumn("userPositiveHistory", arrayReverseUdf(F.col("userPositiveHistory"))) samples = samples.withColumn("userPositiveHistory", arrayReverseUdf(F.col("userPositiveHistory")))
samples.show(10,truncate=False)
for i in range(1,11): for i in range(1,11):
samples = samples.withColumn("userRatedHistory"+str(i), F.when(F.col("userPositiveHistory")[i-1].isNotNull(),F.col("userPositiveHistory")[i-1]).otherwise("-1")) samples = samples.withColumn("userRatedHistory"+str(i), F.when(F.col("userPositiveHistory")[i-1].isNotNull(),F.col("userPositiveHistory")[i-1]).otherwise("-1"))
...@@ -851,4 +851,9 @@ if __name__ == '__main__': ...@@ -851,4 +851,9 @@ if __name__ == '__main__':
# #
# print("总耗时m:{}".format((timestmp4 - start)/60)) # print("总耗时m:{}".format((timestmp4 - start)/60))
# #
# spark.stop()
\ No newline at end of file train_df = samplesWithUserFeatures.toPandas()
train_df = pd.DataFrame(train_df)
train_df.to_csv("/tmp/service_{}.csv".format(endDay))
spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment