Commit 5d7f1af2 authored by 郭羽's avatar 郭羽

service model 优化

parent a7735bd8
......@@ -229,13 +229,12 @@ def addUserFeatures(samples,dataVocab,multiVocab):
samples = samples.withColumn(new_col + "__" + str(i),F.when(F.col(new_col)[i - 1].isNotNull(), F.col(new_col)[i - 1]).otherwise("-1"))
dataVocab[new_col + "__" + str(i)] = v
samples = samples.drop(new_col)
samples = samples.drop(new_col).drop(c)
samples.printSchema()
samples.show(10,truncate=False)
return samples
def addSampleLabel(ratingSamples):
ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 1, 1).otherwise(0))
ratingSamples.show(5, truncate=False)
......@@ -303,7 +302,7 @@ def featureToRedis(key,datas):
conn.set(newKey,v)
conn.expire(newKey, 60 * 60 * 24 * 7)
def featuresToRedis(samples,columns,prefix,redisKey):
def userFeaturesToRedis(samples,columns,prefix,redisKey):
idCol = prefix+"id"
timestampCol = idCol+"_timestamp"
......@@ -323,6 +322,39 @@ def featuresToRedis(samples,columns,prefix,redisKey):
print(prefix, resDatas.count())
resDatas.repartition(8).foreachPartition(toRedis)
def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
idCol = "item_id"
timestampCol = "item_timestamp"
def toRedis(datas):
conn = getRedisConn()
for d in datas:
k = d[idCol]
v = json.dumps(d.asDict(), ensure_ascii=False)
newKey = redisKey + k
conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7)
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
item_static_columns = [col for col in prefixSamples.columns if col.endswith("Bucket") or col.endswith("_number")]
prefixSamples = prefixSamples.select(*item_static_columns)
prefixSamples.show(10,truncate=False)
resDatas = itemDF.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
for col in item_static_columns:
res = "0"
if col.endswith("Bucket"):
res = "0"
if col.endswith("_number"):
res = 0
resDatas = resDatas.withColumn(col,F.when(F.col(col).isNull(), res).otherwise(F.col(col)))
resDatas.show(10,truncate=False)
resDatas = resDatas.select(*columns).distinct()
print("item size:",len(resDatas))
resDatas.repartition(8).foreachPartition(toRedis)
"""
数据加载
"""
......@@ -813,12 +845,12 @@ if __name__ == '__main__':
"""特征数据存入redis======================================"""
# user特征数据存入redis
featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp3))
# item特征数据存入redis
featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
itemFeaturesToRedis(samplesWithUserFeatures,itemDF_spark,item_columns, FEATURE_ITEM_KEY)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment