Commit 10426904 authored by 郭羽's avatar 郭羽

ctr 平滑

parent 12327a44
...@@ -21,6 +21,7 @@ sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))) ...@@ -21,6 +21,7 @@ sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import utils.configUtils as configUtils import utils.configUtils as configUtils
# import utils.connUtils as connUtils # import utils.connUtils as connUtils
import pandas as pd import pandas as pd
import math
...@@ -217,6 +218,24 @@ def arrayReverse(arr): ...@@ -217,6 +218,24 @@ def arrayReverse(arr):
arr.reverse() arr.reverse()
return arr return arr
"""
p —— 概率,即点击的概率,也就是 CTR
n —— 样本总数,即曝光数
z —— 在正态分布里,均值 + z * 标准差会有一定的置信度。例如 z 取 1.96,就有 95% 的置信度。
Wilson区间的含义就是,就是指在一定置信度下,真实的 CTR 范围是多少
"""
def wilson_ctr(num_pv, num_click):
num_pv = float(num_pv)
num_click = float(num_click)
if num_pv * num_click == 0 or num_pv < num_click:
return 0
z = 1.96;
n = num_pv;
p = num_click / num_pv;
score = (p + z*z/(2*n) - z*math.sqrt((p*(1.0 - p) + z*z /(4.0*n))/n)) / (1.0 + z*z/n);
return score;
def addUserFeatures(samples,dataVocab,multiVocab): def addUserFeatures(samples,dataVocab,multiVocab):
dataVocab["userid"] = collectColumnToVocab(samples, "userid") dataVocab["userid"] = collectColumnToVocab(samples, "userid")
dataVocab["user_city_id"] = collectColumnToVocab(samples, "user_city_id") dataVocab["user_city_id"] = collectColumnToVocab(samples, "user_city_id")
...@@ -224,6 +243,7 @@ def addUserFeatures(samples,dataVocab,multiVocab): ...@@ -224,6 +243,7 @@ def addUserFeatures(samples,dataVocab,multiVocab):
extractTagsUdf = F.udf(extractTags, ArrayType(StringType())) extractTagsUdf = F.udf(extractTags, ArrayType(StringType()))
arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType())) arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType()))
ctrUdf = F.udf(wilson_ctr, ArrayType(float()))
print("user历史数据处理...") print("user历史数据处理...")
# user历史记录 # user历史记录
samples = samples.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('item_id')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) samples = samples.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('item_id')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)))
...@@ -264,9 +284,7 @@ def addUserFeatures(samples,dataVocab,multiVocab): ...@@ -264,9 +284,7 @@ def addUserFeatures(samples,dataVocab,multiVocab):
.withColumn("userExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over( .withColumn("userExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over(
sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast( sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
"float")) \ "float")) \
.withColumn("userCtr", .withColumn("userCtr",F.format_number(ctrUdf(F.col("userClickCount"),F.col("userExpCount")),NUMBER_PRECISION))\
F.format_number(F.col("userClickCount") / (F.col("userExpCount") + 1), NUMBER_PRECISION).cast(
"float")) \
.filter(F.col("userRatingCount") > 1) .filter(F.col("userRatingCount") > 1)
samples.show(10, truncate=False) samples.show(10, truncate=False)
...@@ -809,113 +827,113 @@ if __name__ == '__main__': ...@@ -809,113 +827,113 @@ if __name__ == '__main__':
print(itemDF.columns) print(itemDF.columns)
print(itemDF.head(10)) print(itemDF.head(10))
# # 行为数据 # 行为数据
# clickSql = getClickSql(startDay,endDay) clickSql = getClickSql(startDay,endDay)
# expSql = getExposureSql(startDay,endDay) expSql = getExposureSql(startDay,endDay)
#
# clickDF = spark.sql(clickSql) clickDF = spark.sql(clickSql)
# expDF = spark.sql(expSql) expDF = spark.sql(expSql)
# # ratingDF = samplesNegAndUnion(clickDF,expDF) # ratingDF = samplesNegAndUnion(clickDF,expDF)
# ratingDF = clickDF.union(expDF) ratingDF = clickDF.union(expDF)
# ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\ ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\
# .withColumnRenamed("device_id", "userid")\ .withColumnRenamed("device_id", "userid")\
# .withColumnRenamed("card_id", "item_id")\ .withColumnRenamed("card_id", "item_id")\
# .withColumnRenamed("page_stay", "rating")\ .withColumnRenamed("page_stay", "rating")\
# .withColumnRenamed("os", "user_os")\ .withColumnRenamed("os", "user_os")\
# .withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\ .withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\
# .withColumn("timestamp",F.col("timestamp").cast("long")) .withColumn("timestamp",F.col("timestamp").cast("long"))
#
# print(ratingDF.columns) print(ratingDF.columns)
# print(ratingDF.show(10, truncate=False)) print(ratingDF.show(10, truncate=False))
#
# print("添加label...") print("添加label...")
# ratingSamplesWithLabel = addSampleLabel(ratingDF) ratingSamplesWithLabel = addSampleLabel(ratingDF)
# df = ratingSamplesWithLabel.toPandas() df = ratingSamplesWithLabel.toPandas()
# df = pd.DataFrame(df) df = pd.DataFrame(df)
#
# posCount = df.loc[df["label"]==0]["label"].count() posCount = df.loc[df["label"]==0]["label"].count()
# negCount = df.loc[df["label"]==1]["label"].count() negCount = df.loc[df["label"]==1]["label"].count()
# print("pos size:"+str(posCount),"neg size:"+str(negCount)) print("pos size:"+str(posCount),"neg size:"+str(negCount))
#
# itemDF = get_service_feature_df() itemDF = get_service_feature_df()
# print(itemDF.columns) print(itemDF.columns)
# print(itemDF.head(10)) print(itemDF.head(10))
# # itemDF.to_csv("/tmp/service_{}.csv".format(endDay)) # itemDF.to_csv("/tmp/service_{}.csv".format(endDay))
# # df.to_csv("/tmp/service_train_{}.csv".format(endDay)) # df.to_csv("/tmp/service_train_{}.csv".format(endDay))
#
# # 数据字典 # 数据字典
# dataVocab = {} dataVocab = {}
# multiVocab = {} multiVocab = {}
#
# print("处理item特征...") print("处理item特征...")
# timestmp1 = int(round(time.time())) timestmp1 = int(round(time.time()))
# itemDF = addItemFeatures(itemDF, dataVocab,multiVocab) itemDF = addItemFeatures(itemDF, dataVocab,multiVocab)
# timestmp2 = int(round(time.time())) timestmp2 = int(round(time.time()))
# print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1)) print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
# print("multiVocab:") print("multiVocab:")
# for k,v in multiVocab.items(): for k,v in multiVocab.items():
# print(k,len(v)) print(k,len(v))
#
# print("dataVocab:") print("dataVocab:")
# for k, v in dataVocab.items(): for k, v in dataVocab.items():
# print(k, len(v)) print(k, len(v))
#
#
# itemDF_spark = spark.createDataFrame(itemDF) itemDF_spark = spark.createDataFrame(itemDF)
# itemDF_spark.printSchema() itemDF_spark.printSchema()
# itemDF_spark.show(10, truncate=False) itemDF_spark.show(10, truncate=False)
#
# # item统计特征处理 # item统计特征处理
# itemStaticDF = addItemStaticFeatures(ratingSamplesWithLabel,itemDF_spark,dataVocab) itemStaticDF = addItemStaticFeatures(ratingSamplesWithLabel,itemDF_spark,dataVocab)
#
# # 统计数据处理 # 统计数据处理
# # ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab) # ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
#
# samples = ratingSamplesWithLabel.join(itemStaticDF, on=['item_id'], how='inner') samples = ratingSamplesWithLabel.join(itemStaticDF, on=['item_id'], how='inner')
#
# print("处理user特征...") print("处理user特征...")
# samplesWithUserFeatures = addUserFeatures(samples,dataVocab,multiVocab) samplesWithUserFeatures = addUserFeatures(samples,dataVocab,multiVocab)
# timestmp3 = int(round(time.time())) timestmp3 = int(round(time.time()))
# print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2)) print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2))
# #
# # user columns
# user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
# print("collect feature for user:{}".format(str(user_columns)))
# # item columns
# item_columns = [c for c in itemStaticDF.columns if c.startswith("item")]
# print("collect feature for item:{}".format(str(item_columns)))
# # model columns
# print("model columns to redis...")
# model_columns = user_columns + item_columns
# featureColumnsToRedis(model_columns)
#
# print("数据字典save...")
# print("dataVocab:", str(dataVocab.keys()))
# vocab_path = "../vocab/{}_vocab.json".format(VERSION)
# dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
# open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
#
# # item特征数据存入redis
# itemFeaturesToRedis(itemStaticDF, FEATURE_ITEM_KEY)
# timestmp6 = int(round(time.time()))
# print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp3))
#
# """特征数据存入redis======================================"""
# # user特征数据存入redis
# userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
# timestmp5 = int(round(time.time()))
# print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp6))
#
# """训练数据保存 ======================================"""
# timestmp3 = int(round(time.time()))
# train_columns = model_columns + ["label", "timestamp", "rating"]
# trainSamples = samplesWithUserFeatures.select(*train_columns)
# train_df = trainSamples.toPandas()
# train_df = pd.DataFrame(train_df)
# train_df.to_csv(DATA_PATH_TRAIN,sep="|")
# timestmp4 = int(round(time.time()))
# print("训练数据写入success 耗时s:{}".format(timestmp4 - timestmp3))
#
# print("总耗时m:{}".format((timestmp4 - start)/60))
# #
# spark.stop() # user columns
\ No newline at end of file user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
print("collect feature for user:{}".format(str(user_columns)))
# item columns
item_columns = [c for c in itemStaticDF.columns if c.startswith("item")]
print("collect feature for item:{}".format(str(item_columns)))
# model columns
print("model columns to redis...")
model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns)
print("数据字典save...")
print("dataVocab:", str(dataVocab.keys()))
vocab_path = "../vocab/{}_vocab.json".format(VERSION)
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
# item特征数据存入redis
itemFeaturesToRedis(itemStaticDF, FEATURE_ITEM_KEY)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp3))
"""特征数据存入redis======================================"""
# user特征数据存入redis
userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp6))
"""训练数据保存 ======================================"""
timestmp3 = int(round(time.time()))
train_columns = model_columns + ["label", "timestamp", "rating"]
trainSamples = samplesWithUserFeatures.select(*train_columns)
train_df = trainSamples.toPandas()
train_df = pd.DataFrame(train_df)
train_df.to_csv(DATA_PATH_TRAIN,sep="|")
timestmp4 = int(round(time.time()))
print("训练数据写入success 耗时s:{}".format(timestmp4 - timestmp3))
print("总耗时m:{}".format((timestmp4 - start)/60))
spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment