Commit 7fb19aef authored by 宋柯's avatar 宋柯

模型调试

parent a794ea12
......@@ -40,7 +40,9 @@ FEATURE_ITEM_KEY = "Strategy:rec:feature:service:" + VERSION + ":item:"
FEATURE_VOCAB_KEY = "Strategy:rec:vocab:service:" + VERSION
FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION
ITEM_PREFIX = "item_"
ITEM_PREFIX = "ITEM_"
CATEGORY_PREFIX = "CATEGORY_"
NUMERIC_PREFIX = "NUMERIC_"
DATA_PATH_TRAIN = "/data/files/service_feature_{}_train.csv".format(VERSION)
......@@ -131,13 +133,13 @@ def getItemStaticFeatures(itemStatisticDays, startDay, endDay):
staticFeatures = spark.sql(itemStatisticSql)
clickStaticFeatures = staticFeatures.where(F.col('label') == F.lit(1))\
.withColumnRenamed('label_count_sum', 'click_count_sum')\
.withColumnRenamed('label_count_avg', 'click_count_avg')\
.withColumnRenamed('label_count_stddev', 'click_count_stddev')
.withColumnRenamed('label_count_sum', ITEM_PREFIX + NUMERIC_PREFIX + 'click_count_sum')\
.withColumnRenamed('label_count_avg', ITEM_PREFIX + NUMERIC_PREFIX + 'click_count_avg')\
.withColumnRenamed('label_count_stddev', ITEM_PREFIX + NUMERIC_PREFIX + 'click_count_stddev')
expStaticFeatures = staticFeatures.where(F.col('label') == F.lit(0))\
.withColumnRenamed('label_count_sum', 'exp_count_sum')\
.withColumnRenamed('label_count_avg', 'exp_count_avg')\
.withColumnRenamed('label_count_stddev', 'exp_count_stddev')
.withColumnRenamed('label_count_sum', ITEM_PREFIX + NUMERIC_PREFIX + 'exp_count_sum')\
.withColumnRenamed('label_count_avg', ITEM_PREFIX + NUMERIC_PREFIX + 'exp_count_avg')\
.withColumnRenamed('label_count_stddev', ITEM_PREFIX + NUMERIC_PREFIX + 'exp_count_stddev')
drop_columns = ['label', 'label_count']
clickStaticFeatures = clickStaticFeatures.drop(*drop_columns)
......@@ -218,37 +220,18 @@ def itemEsFeaturesProcess(itemDF, spark):
print("item es 特征工程 ")
item_es_feature_start_time = int(round(time.time()))
onehot_cols = ['id', 'service_type', 'merchant_id', 'doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type', 'hospital_is_high_quality']
multi_cols = ['tags_v3', 'second_demands', 'second_solutions', 'second_positions']
item_categoty_cols = ['id', 'service_type', 'merchant_id', 'doctor_type', 'doctor_id',
'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type', 'hospital_is_high_quality',
'tags_v3', 'second_demands', 'second_solutions', 'second_positions']
for onehot_col in onehot_cols:
itemDF[ITEM_PREFIX + onehot_col] = itemDF[onehot_col]
itemDF = itemDF.drop(columns = onehot_cols)
for item_categoty_col in item_categoty_cols:
itemDF[ITEM_PREFIX + CATEGORY_PREFIX + item_categoty_col] = itemDF[item_categoty_col]
itemDF = itemDF.drop(columns = item_categoty_cols)
for multi_col in multi_cols:
#TODO 这里多标签的应该拆开
# multi_col_unique = list(set(flatten(map(lambda x: x.split(','), itemDF[multi_col].tolist()))))
itemDF[multi_col] = itemDF[multi_col].map(lambda x: x.split(","))
for idx in range(1, 6):
itemDF[ITEM_PREFIX + multi_col + "__" + str(idx)] = itemDF[multi_col].map(lambda tagArray: parseTagsFromArray(tagArray, idx))
itemDF = itemDF.drop(columns = multi_cols)
# 连续特征分桶
# bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ['case_count', 'sales_count']:
itemDF[ITEM_PREFIX + col + bucket_suffix] = itemDF[col].map(numberToBucket)
itemDF = itemDF.drop(columns = [col])
for col in ['sku_price']:
itemDF[ITEM_PREFIX + col + bucket_suffix] = itemDF[col].map(priceToBucket)
itemDF = itemDF.drop(columns = [col])
# 连续数据处理
number_suffix = "_number"
for col in ["discount"]:
itemDF[ITEM_PREFIX + col + number_suffix] = itemDF[col]
itemDF = itemDF.drop(columns=[col])
item_numeric_cols = ['case_count', 'sales_count', 'discount', 'sku_price']
for item_numeric_col in item_numeric_cols:
itemDF[ITEM_PREFIX + NUMERIC_PREFIX + item_numeric_col] = itemDF[item_numeric_col]
itemDF = itemDF.drop(columns = [item_numeric_cols])
itemEsFeatureDF = spark.createDataFrame(itemDF)
itemEsFeatureDF.printSchema()
......@@ -291,7 +274,7 @@ def wilson_ctr(num_pv, num_click):
score = (p + z*z/(2*n) - z*math.sqrt((p*(1.0 - p) + z*z /(4.0*n))/n)) / (1.0 + z*z/n);
return float(score);
def getUserProfileFeature(samples_iEsF_iStatisticF, spark, startDay, endDay):
def getUserProfileFeature(spark, startDay, endDay):
#连接doris_olap库
userProfileFeatureDF = spark.read.jdbc('jdbc:mysql://172.16.30.136:3306/doris_olap', 'user_tag3_portrait', numPartitions = 100,
properties = { 'user': 'doris_olap', 'password': 'bA27hXasdfswuolap', 'driver': 'com.mysql.jdbc.Driver' })
......@@ -305,7 +288,17 @@ def getUserProfileFeature(samples_iEsF_iStatisticF, spark, startDay, endDay):
print(table_query)
userProfileFeatureDF = spark.sql(table_query)
userProfileFeatureDF.show(100, False)
userProfileFeatureDF.cache()
userProfileFeatureDF.show(20, False)
def addOneDay(dt):
return (date.fromisoformat(dt) + timedelta(days = 1)).strftime('%Y%m%d')
addOneDay_UDF = F.udf(addOneDay, StringType())
userProfileFeatureDF = userProfileFeatureDF.withColumn('partition_date', addOneDay_UDF('dt')).drop('dt')
userProfileFeatureDF.show(20, False)
return userProfileFeatureDF
def addUserFeatures(samples):
......@@ -842,15 +835,22 @@ def get_item_es_feature_df():
datas.append(data)
print("item size:",len(datas))
itemColumns = ['id','discount', 'case_count', 'sales_count', 'service_type','merchant_id',
'doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type',
'hospital_is_high_quality', 'second_demands','second_solutions', 'second_positions',
'tags_v3','sku_price']
itemColumns = [ITEM_PREFIX + CATEGORY_PREFIX + 'id',ITEM_PREFIX + NUMERIC_PREFIX + 'discount',
ITEM_PREFIX + NUMERIC_PREFIX + 'case_count', ITEM_PREFIX + NUMERIC_PREFIX + 'sales_count',
ITEM_PREFIX + CATEGORY_PREFIX + 'service_type',ITEM_PREFIX + CATEGORY_PREFIX + 'merchant_id',
ITEM_PREFIX + CATEGORY_PREFIX + 'doctor_type', ITEM_PREFIX + CATEGORY_PREFIX + 'doctor_id',
ITEM_PREFIX + CATEGORY_PREFIX + 'doctor_famous', ITEM_PREFIX + CATEGORY_PREFIX + 'hospital_id',
ITEM_PREFIX + CATEGORY_PREFIX + 'hospital_city_tag_id', ITEM_PREFIX + CATEGORY_PREFIX + 'hospital_type',
ITEM_PREFIX + CATEGORY_PREFIX + 'hospital_is_high_quality', ITEM_PREFIX + CATEGORY_PREFIX + 'second_demands',
ITEM_PREFIX + CATEGORY_PREFIX + 'second_solutions', ITEM_PREFIX + CATEGORY_PREFIX + 'second_positions',
ITEM_PREFIX + CATEGORY_PREFIX + 'tags_v3', ITEM_PREFIX + NUMERIC_PREFIX + 'sku_price']
# 'sku_tags','sku_show_tags','sku_price']
itemEsFeatureDF = pd.DataFrame(datas,columns=itemColumns)
print("itemEsFeatureDF.columns: {}".format(itemEsFeatureDF.columns))
print(itemEsFeatureDF.head(10))
itemEsFeatureDF = spark.createDataFrame(itemEsFeatureDF)
itemEsFeatureDF.printSchema()
itemEsFeatureDF.show(10, truncate=False)
return itemEsFeatureDF
......@@ -935,17 +935,13 @@ if __name__ == '__main__':
spark = get_spark("SERVICE_FEATURE_CSV_EXPORT_SK")
spark.sparkContext.setLogLevel("ERROR")
getUserProfileFeature(None, spark, addDays(-trainDays - 1, format = "%Y-%m-%d"), addDays(-1, format = "%Y-%m-%d"))
sys.exit()
userProfileFeatureDF = getUserProfileFeature(spark, addDays(-trainDays - 1, format = "%Y-%m-%d"), addDays(-1, format = "%Y-%m-%d"))
#获取点击曝光数据
clickDF, expDF, ratingDF, startDay, endDay = get_click_exp_rating_df(trainDays, spark)
#item Es Feature
itemEsFeatureDF = get_item_es_feature_df()
#item Es Feature Process
itemEsFeatureDF = itemEsFeaturesProcess(itemEsFeatureDF, spark)
#计算 item 统计特征
clickStaticFeatures, expStaticFeatures = getItemStaticFeatures(itemStatisticStartDays + trainDays, startDay, endDay)
......@@ -954,14 +950,12 @@ if __name__ == '__main__':
.join(expStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
.join(itemEsFeatureDF, on = ["item_id"], how = 'left')
#item 统计 特征 Process
samples_iEsF_iStatisticF = itemStatisticFeaturesProcess(samples_iEsF_iStatisticF)
#user profile feature
samplesWithUserFeatures = getUserProfileFeature(samples_iEsF_iStatisticF, spark)
#
sys.exit()
# user columns
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
print("collect feature for user:{}".format(str(user_columns)))
......
......@@ -62,6 +62,7 @@ def getWeight(x):
return res
tf.feature_column.bucketized_column()
def getDataSet(df,shuffleSize = 10000,batchSize=128):
# print(df.dtypes)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment