Commit 70bb8fab authored by 郭羽's avatar 郭羽

美购精排模型

parent 28556a05
......@@ -8,10 +8,10 @@ import os
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import utils.connUtils as connUtils
ITEM_NUMBER_COLUMNS = ["smart_rank2"]
embedding_columns = ["itemid","userid","doctor_id","hospital_id"]
ITEM_NUMBER_COLUMNS = ["item_"+c for c in ["smart_rank2"]]
embedding_columns = ["itemid","userid"] + ["item_"+c for c in ["doctor_id","hospital_id"]]
multi_columns = ["tags_v3","first_demands","second_demands","first_solutions","second_solutions","first_positions","second_positions"]
one_hot_columns = ["service_type","doctor_type","doctor_famous","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]
one_hot_columns = ["item_"+c for c in ["service_type","doctor_type","doctor_famous","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]]
# history_columns = ["userRatedHistory"]
# 数据加载
......
......@@ -54,8 +54,17 @@ ITEM_NUMBER_COLUMNS = ["lowest_price","smart_rank2","case_count","ordered_user_i
ITEM_CATE_COLUMNS = ["service_type","doctor_type","doctor_id","doctor_famous","hospital_id","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]
NUMBER_PRECISION = 2
VERSION = "v1"
FEATURE_USER_KEY = "Strategy:rec:feature:service:" + VERSION + ":user:"
FEATURE_ITEM_KEY = "Strategy:rec:feature:service:" + VERSION + ":item:"
FEATURE_VOCAB_KEY = "Strategy:rec:vocab:service:" + VERSION
FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION
TRAIN_FILE_PATH = "service_feature_" + VERSION
def addItemFeatures(samples,itemDF):
prefix = "item_"
itemDF = itemDF.withColumnRenamed("id", "itemid")
samples = samples.join(itemDF, on=['itemid'], how='left')
# 数据过滤:无医生
......@@ -63,26 +72,26 @@ def addItemFeatures(samples,itemDF):
# null处理
for c in ITEM_NUMBER_COLUMNS:
print("null count:",c,samples.filter(col(c).isNull()).count())
samples = samples.withColumn(c,when(col(c).isNull(),0).otherwise(col(c)).cast("float"))
samples = samples.withColumn(prefix+c,when(col(c).isNull(),0).otherwise(col(c)).cast("float")).drop(c)
for c in ITEM_CATE_COLUMNS:
print("null count:", c, samples.filter(col(c).isNull()).count())
samples = samples.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c)))
samples = samples.withColumn(prefix+c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))).drop(c)
# 离散特征处理
for c, v in ITEM_MULTI_COLUMN_EXTRA_MAP.items():
print("null count:", c, samples.filter(col(c).isNull()).count())
samples = samples.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c)))
for i in range(1, v + 1):
new_c = c + "__" + str(i)
new_c = prefix + c + "__" + str(i)
samples = samples.withColumn(new_c, F.split(F.col(c), ",")[i - 1])
samples = samples.withColumn(new_c, F.when(F.col(new_c).isNull(), "-1").otherwise(F.col(new_c)))
# 统计特征处理
staticFeatures = samples.groupBy('itemid').agg(F.count(F.lit(1)).alias('itemRatingCount'),
F.avg(F.col('rating')).alias('itemRatingAvg'),
F.stddev(F.col('rating')).alias('itemRatingStddev')).fillna(0)\
.withColumn('itemRatingStddev', F.format_number(F.col('itemRatingStddev'), NUMBER_PRECISION).cast("float"))\
F.stddev(F.col('rating')).alias('itemRatingStddev')).fillna(0) \
.withColumn('itemRatingStddev', F.format_number(F.col('itemRatingStddev'), NUMBER_PRECISION).cast("float")) \
.withColumn('itemRatingAvg', F.format_number(F.col('itemRatingAvg'), NUMBER_PRECISION).cast("float"))
# join item rating features
......@@ -95,12 +104,17 @@ def addItemFeatures(samples,itemDF):
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c+"Scale"))
# bucketing
for c in ["case_count", "ordered_user_ids_count","itemRatingCount","lowest_price","itemRatingStddev","itemRatingAvg"]:
pipelineStage.append(QuantileDiscretizer(numBuckets=20, inputCol=c, outputCol=c + "Bucket"))
bucketColumns = [prefix+"case_count", prefix+"ordered_user_ids_count", prefix+"lowest_price", "itemRatingCount", "itemRatingStddev","itemRatingAvg"]
for c in bucketColumns:
pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket"))
featurePipeline = Pipeline(stages=pipelineStage)
samples = featurePipeline.fit(samples).transform(samples)
# 转string
for c in bucketColumns:
samples = samples.withColumn(c + "Bucket",F.col(c + "Bucket").cast("string"))
samples.printSchema()
samples.show(5, truncate=False)
......@@ -157,14 +171,18 @@ def addUserFeatures(samples):
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c + "Scale"))
# bucketing
for c in ["userRatingCount","userRatingAvg","userRatingStddev"]:
pipelineStage.append(QuantileDiscretizer(numBuckets=20, inputCol=c, outputCol=c + "Bucket"))
bucketColumns = ["userRatingCount","userRatingAvg","userRatingStddev"]
for c in bucketColumns:
pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket"))
featurePipeline = Pipeline(stages=pipelineStage)
samples = featurePipeline.fit(samples).transform(samples)
# 转string
for c in bucketColumns:
samples = samples.withColumn(c + "Bucket", F.col(c + "Bucket").cast("string"))
samples.printSchema()
samples.show(10)
samples.show(5,truncate=False)
return samples
......@@ -235,14 +253,34 @@ def getDataVocab(samples):
return dataVocab
def dataVocabToRedis(dataVocab,version="v1"):
def dataVocabToRedis(dataVocab):
conn = connUtils.getRedisConn()
key = "Strategy:rec:vocab:service:"+version
conn.set(key,dataVocab)
conn.expire(key,60 * 60 * 24 * 7)
conn.set(FEATURE_VOCAB_KEY,dataVocab)
conn.expire(FEATURE_VOCAB_KEY,60 * 60 * 24 * 7)
def featureColumnsToRedis(columns):
conn = connUtils.getRedisConn()
conn.set(FEATURE_COLUMN_KEY, json.dumps(columns))
conn.expire(FEATURE_COLUMN_KEY, 60 * 60 * 24 * 7)
def featureToRedis(key,datas):
conn = connUtils.getRedisConn()
pipeline = conn.pipeline()
for k,v in datas.items():
newKey = key+k
pipeline.set(newKey,v)
pipeline.expire(newKey, 60 * 60 * 24 * 7)
pipeline.execute()
pipeline.close()
conn.close()
def featureToRedis():
pass
def collectFeaturesToDict(samples,columns,prefix):
idCol = prefix+"id"
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias("timestamp"))
resDatas = prefixSamples.join(samples, on=[idCol,"timestamp"], how='left').select(*columns).distinct().collect()
return {d[idCol]:json.dumps(d.asDict(),ensure_ascii=False) for d in resDatas}
"""
......@@ -607,10 +645,33 @@ if __name__ == '__main__':
dataVocabStr = json.dumps(dataVocab,ensure_ascii=False)
dataVocabToRedis(dataVocabStr)
file_path = "/service_feature"
# user columns
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
print("collect feature for user:{}".format(str(user_columns)))
# item columns
item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
print("collect feature for item:{}".format(str(item_columns)))
# user特征数据存入redis
print("user feature to redis...")
userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
featureToRedis(FEATURE_USER_KEY, userDatas)
# item特征数据存入redis
print("item feature to redis...")
itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
featureToRedis(FEATURE_ITEM_KEY, itemDatas)
# model columns
print("model columns to redis...")
model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns)
train_columns = model_columns + ["label", "timestamp"]
trainSamples = samplesWithUserFeatures.select(*train_columns)
print("write to hdfs start...")
splitTimestamp = int(time.mktime(time.strptime(endDay, "%Y%m%d")))
splitAndSaveTrainingTestSamplesByTimeStamp(samplesWithUserFeatures, splitTimestamp, file_path)
splitAndSaveTrainingTestSamplesByTimeStamp(samplesWithUserFeatures, splitTimestamp, TRAIN_FILE_PATH)
print("write to hdfs success...")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment