Commit f370c36e authored by 郭羽's avatar 郭羽

service model 优化

parent db3c60f5
......@@ -11,11 +11,9 @@ import redis
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import when,col
from pyspark.sql.functions import when
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, QuantileDiscretizer, MinMaxScaler
from collections import defaultdict
import json
......@@ -33,27 +31,6 @@ import pandas as pd
"""
特征工程
"""
ITEM_MULTI_COLUMN_EXTRA_MAP = {"first_demands": 1,
"second_demands": 5,
"first_solutions": 1,
"second_solutions": 5,
"first_positions": 1,
"second_positions": 5,
"tags_v3": 10,
}
USER_MULTI_COLUMN_EXTRA_MAP = {"first_demands": 1,
"second_demands": 3,
"first_solutions": 1,
"second_solutions": 3,
"first_positions": 1,
"second_positions": 3,
"tags_v3": 5,
}
ITEM_NUMBER_COLUMNS = ["lowest_price","smart_rank2","case_count","ordered_user_ids_count"]
ITEM_CATE_COLUMNS = ["service_type","merchant_id","doctor_type","doctor_id","doctor_famous","hospital_id","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]
NUMBER_PRECISION = 2
VERSION = configUtils.SERVICE_VERSION
......@@ -62,8 +39,8 @@ FEATURE_ITEM_KEY = "Strategy:rec:feature:service:" + VERSION + ":item:"
FEATURE_VOCAB_KEY = "Strategy:rec:vocab:service:" + VERSION
FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION
TRAIN_FILE_PATH = "service_feature_" + VERSION
ITEM_PREFIX = "item_"
DATA_PATH_TRAIN = "/data/files/service_feature_{}_train.csv".format(VERSION)
def getRedisConn():
......@@ -107,7 +84,7 @@ priceToBucketUdf = F.udf(priceToBucket, FloatType())
def addStaticsFeatures(samples,dataVocab):
print("user统计特征处理...")
samples = samples \
.withColumn('userRatingCount',F.format_number(F.count(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn('userRatingCount',F.format_number(F.sum(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingStddev", F.format_number(F.stddev(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("userClickCount", F.format_number(F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
......@@ -117,13 +94,15 @@ def addStaticsFeatures(samples,dataVocab):
print("item统计特征处理...")
samples = samples \
.withColumn('itemRatingCount',F.format_number(F.count(F.lit(1)).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn('itemRatingCount',F.format_number(F.sum(F.lit(1)).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn("itemRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn("itemRatingStddev", F.format_number(F.stddev(F.col("rating")).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("itemClickCount", F.format_number(F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).over(sql.Window.partitionBy("item_id").orderBy(F.col("timestamp")).rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("itemExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over(sql.Window.partitionBy("item_id").orderBy(F.col("timestamp")).rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("itemCtr", F.format_number(F.col("itemClickCount")/(F.col("itemExpCount")+1),NUMBER_PRECISION).cast("float")) \
samples.show(20, truncate=False)
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
......@@ -335,7 +314,7 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7)
item_static_columns = [idCol] + ["itemRatingCountBucket", "itemRatingAvgBucket", "itemClickCountBucket", "itemExpCountBucket","itemRatingStddev_number","itemCtr_number"]
item_static_columns = [idCol] + ["itemRatingCount_Bucket", "itemRatingAvg_Bucket", "itemClickCount_Bucket", "itemExpCount_Bucket","itemRatingStddev_number","itemCtr_number"]
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
item_static_df = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
......@@ -865,7 +844,7 @@ if __name__ == '__main__':
trainSamples = samplesWithUserFeatures.select(*train_columns)
train_df = trainSamples.toPandas()
train_df = pd.DataFrame(train_df)
train_df.to_csv("/tmp/service_{}.csv".format(endDay))
train_df.to_csv(DATA_PATH_TRAIN)
timestmp4 = int(round(time.time()))
print("训练数据写入success 耗时s:{}".format(timestmp4 - timestmp3))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment