Commit dc65d8fe authored by 郭羽's avatar 郭羽

update feature

parent 37995a94
...@@ -334,22 +334,20 @@ def featuresToRedis(samples,columns,prefix,redisKey): ...@@ -334,22 +334,20 @@ def featuresToRedis(samples,columns,prefix,redisKey):
timestampCol = idCol+"_timestamp" timestampCol = idCol+"_timestamp"
def toRedis(datas): def toRedis(datas):
# utils路径无法找到情况
conn = connUtils.getRedisConn() conn = connUtils.getRedisConn()
for d in datas: for d in datas:
k = d[idCol] k = d[idCol]
v = json.dumps(d.asDict(), ensure_ascii=False) v = json.dumps(d.asDict(), ensure_ascii=False)
newKey = redisKey + k newKey = redisKey + k
print(newKey,v)
conn.set(newKey, v) conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7) conn.expire(newKey, 60 * 60 * 24 * 7)
#根据timestamp获取每个user最新的记录 #根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol)) prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
resDF = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol)) resDatas = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
distinctDF = resDF.select(*columns).distinct() resDatas = resDatas.select(*columns).distinct()
print(prefix,distinctDF.count()) print(prefix, len(resDatas))
distinctDF.foreachPartition(toRedis) resDatas.repartition(8).foreachPartition(toRedis)
""" """
数据加载 数据加载
...@@ -810,24 +808,21 @@ if __name__ == '__main__': ...@@ -810,24 +808,21 @@ if __name__ == '__main__':
timestmp4 = int(round(time.time())) timestmp4 = int(round(time.time()))
# user特征数据存入redis # user特征数据存入redis
# featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY) featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
print("user feature to collect 耗时s:{}".format(int(round(time.time()))-timestmp4))
timestmp4 = int(round(time.time()))
featureToRedis(FEATURE_USER_KEY, userDatas)
timestmp5 = int(round(time.time())) timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp4)) print("user feature to redis 耗时s:{}".format(timestmp5-timestmp4))
# userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
# featureToRedis(FEATURE_USER_KEY, userDatas)
# item特征数据存入redis # item特征数据存入redis
# todo 添加最近一个月有行为的item,待优化:扩大item范围 # todo 添加最近一个月有行为的item,待优化:扩大item范围
# featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY) featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
print("item feature to collect 耗时s:{}".format(int(round(time.time())) - timestmp5))
timestmp5 = int(round(time.time()))
featureToRedis(FEATURE_ITEM_KEY, itemDatas)
timestmp6 = int(round(time.time())) timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5)) print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
# itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
# featureToRedis(FEATURE_ITEM_KEY, itemDatas)
# model columns # model columns
print("model columns to redis...") print("model columns to redis...")
model_columns = user_columns + item_columns model_columns = user_columns + item_columns
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment