Commit c463b952 authored by 宋柯's avatar 宋柯

模型调试

parent c2c2f949
...@@ -146,11 +146,7 @@ def getItemStaticFeatures(itemStatisticDays, startDay, endDay): ...@@ -146,11 +146,7 @@ def getItemStaticFeatures(itemStatisticDays, startDay, endDay):
return clickStaticFeatures, expStaticFeatures return clickStaticFeatures, expStaticFeatures
# ratingDF, itemEsFeatureDF, startDay, endDay # ratingDF, itemEsFeatureDF, startDay, endDay
def addItemFeatures(samples, itemEsFeatureDF, clickStaticFeatures, expStaticFeatures): def itemStatisticFeaturesProcess(samples_iEsF_iStatisticF):
samples_iEsF_iStatisticF = samples.join(clickStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
.join(expStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
.join(itemEsFeatureDF, on = ["item_id"], how = 'left')
# 连续特征分桶 # 连续特征分桶
bucket_suffix = "_Bucket" bucket_suffix = "_Bucket"
...@@ -166,6 +162,8 @@ def addItemFeatures(samples, itemEsFeatureDF, clickStaticFeatures, expStaticFeat ...@@ -166,6 +162,8 @@ def addItemFeatures(samples, itemEsFeatureDF, clickStaticFeatures, expStaticFeat
new_col = col + number_suffix new_col = col + number_suffix
samples_iEsF_iStatisticF = samples_iEsF_iStatisticF.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col) samples_iEsF_iStatisticF = samples_iEsF_iStatisticF.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
samples_iEsF_iStatisticF.show(50, truncate=False)
return samples_iEsF_iStatisticF return samples_iEsF_iStatisticF
def addUserStaticsFeatures(samples,dataVocab): def addUserStaticsFeatures(samples,dataVocab):
...@@ -214,7 +212,10 @@ def flatten(items): ...@@ -214,7 +212,10 @@ def flatten(items):
else: else:
yield x yield x
def itemEsFeaturesProcess(itemDF): def itemEsFeaturesProcess(itemDF, spark):
print("item es 特征工程 ")
item_es_feature_start_time = int(round(time.time()))
onehot_cols = ['id', 'service_type', 'merchant_id', 'doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type', 'hospital_is_high_quality'] onehot_cols = ['id', 'service_type', 'merchant_id', 'doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type', 'hospital_is_high_quality']
multi_cols = ['tags_v3', 'second_demands', 'second_solutions', 'second_positions'] multi_cols = ['tags_v3', 'second_demands', 'second_solutions', 'second_positions']
...@@ -247,7 +248,14 @@ def itemEsFeaturesProcess(itemDF): ...@@ -247,7 +248,14 @@ def itemEsFeaturesProcess(itemDF):
itemDF[ITEM_PREFIX + col + number_suffix] = itemDF[col] itemDF[ITEM_PREFIX + col + number_suffix] = itemDF[col]
itemDF = itemDF.drop(columns=[col]) itemDF = itemDF.drop(columns=[col])
return itemDF itemEsFeatureDF = spark.createDataFrame(itemDF)
itemEsFeatureDF.printSchema()
itemEsFeatureDF.show(10, truncate=False)
item_es_feature_end_time = int(round(time.time()))
print("item es 特征工程, 耗时: {}s".format(item_es_feature_end_time - item_es_feature_start_time))
return itemEsFeatureDF
def extractTags(genres_list): def extractTags(genres_list):
# 根据点击列表顺序加权 # 根据点击列表顺序加权
...@@ -805,7 +813,7 @@ def parseSource(_source): ...@@ -805,7 +813,7 @@ def parseSource(_source):
return data return data
# es中获取特征 # es中获取特征
def get_service_feature_df(): def get_item_es_feature_df():
es_columns = ["id","discount", "sales_count", "doctor", "case_count", "service_type","merchant_id","second_demands", "second_solutions", "second_positions", "sku_list","tags_v3"] es_columns = ["id","discount", "sales_count", "doctor", "case_count", "service_type","merchant_id","second_demands", "second_solutions", "second_positions", "sku_list","tags_v3"]
query = init_es_query() query = init_es_query()
query["_source"]["includes"] = es_columns query["_source"]["includes"] = es_columns
...@@ -826,6 +834,10 @@ def get_service_feature_df(): ...@@ -826,6 +834,10 @@ def get_service_feature_df():
'tags_v3','sku_price'] 'tags_v3','sku_price']
# 'sku_tags','sku_show_tags','sku_price'] # 'sku_tags','sku_show_tags','sku_price']
df = pd.DataFrame(datas,columns=itemColumns) df = pd.DataFrame(datas,columns=itemColumns)
print("itemEsFeatureDF.columns: {}".format(itemEsFeatureDF.columns))
print(itemEsFeatureDF.head(10))
return df return df
def addDays(n, format="%Y%m%d"): def addDays(n, format="%Y%m%d"):
...@@ -841,23 +853,17 @@ pd.set_option('display.max_rows', None) ...@@ -841,23 +853,17 @@ pd.set_option('display.max_rows', None)
#设置value的显示长度为100,默认为50 #设置value的显示长度为100,默认为50
pd.set_option('max_colwidth',100) pd.set_option('max_colwidth',100)
if __name__ == '__main__': def get_click_exp_start_end_time(trainDays):
start = time.time()
#入参
trainDays = int(sys.argv[1])
itemStatisticStartDays = int(sys.argv[2])
print('trainDays:{}'.format(trainDays),flush=True)
#行为数据的开始结束日期
endDay = addDays(0)
startDay = addDays(-int(trainDays)) startDay = addDays(-int(trainDays))
endDay = addDays(0)
print("click_exp_start_end_time: {}, {}".format(startDay, endDay), flush=True)
return startDay, endDay
#item特征统计行为数据的开始结束日期 def get_click_exp_rating_df(trainDays, spark):
spark = get_spark("SERVICE_FEATURE_CSV_EXPORT_SK") #行为数据的开始结束日期
spark.sparkContext.setLogLevel("ERROR") startDay, endDay = get_click_exp_start_end_time(trainDays)
#获取行为数据 #获取曝光和点击行为数据
clickSql = getClickSql(startDay,endDay) clickSql = getClickSql(startDay,endDay)
expSql = getExposureSql(startDay,endDay) expSql = getExposureSql(startDay,endDay)
clickDF = spark.sql(clickSql) clickDF = spark.sql(clickSql)
...@@ -868,6 +874,8 @@ if __name__ == '__main__': ...@@ -868,6 +874,8 @@ if __name__ == '__main__':
expDF = spark.sql(expSql) expDF = spark.sql(expSql)
expDF.createOrReplaceTempView("expDF") expDF.createOrReplaceTempView("expDF")
expDF.cache() expDF.cache()
#曝光数据过滤掉点击数据
print("expDF 过滤点击数据前 count: ", expDF.count()) print("expDF 过滤点击数据前 count: ", expDF.count())
expDF = spark.sql(""" expDF = spark.sql("""
SELECT t1.partition_date, t1.device_id, t1.card_id, t1.time_stamp, t1.os, t1.user_city_id SELECT t1.partition_date, t1.device_id, t1.card_id, t1.time_stamp, t1.os, t1.user_city_id
...@@ -882,6 +890,7 @@ if __name__ == '__main__': ...@@ -882,6 +890,7 @@ if __name__ == '__main__':
""") """)
print("expDF 过滤点击数据后 count: ", expDF.count()) print("expDF 过滤点击数据后 count: ", expDF.count())
#添加label并且规范字段命名
clickDF = clickDF.withColumn("label", F.lit(1)) clickDF = clickDF.withColumn("label", F.lit(1))
expDF = expDF.withColumn("label", F.lit(0)) expDF = expDF.withColumn("label", F.lit(0))
ratingDF = clickDF.union(expDF) ratingDF = clickDF.union(expDF)
...@@ -892,34 +901,46 @@ if __name__ == '__main__': ...@@ -892,34 +901,46 @@ if __name__ == '__main__':
.withColumnRenamed("os", "user_os")\ .withColumnRenamed("os", "user_os")\
.withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\ .withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\
.withColumn("timestamp",F.col("timestamp").cast("long")) .withColumn("timestamp",F.col("timestamp").cast("long"))
ratingDF.cache() ratingDF.cache()
print("ratingDF.columns: {}".format(ratingDF.columns)) print("ratingDF.columns: {}".format(ratingDF.columns))
print(ratingDF.show(100, truncate=False)) print(ratingDF.show(100, truncate=False))
expDF.unpersist(True) expDF.unpersist(True)
clickDF.unpersist(True) clickDF.unpersist(True)
#item Es Feature return clickDF, expDF, ratingDF, startDay, endDay
itemEsFeatureDF = get_service_feature_df()
print("itemEsFeatureDF.columns: {}".format(itemEsFeatureDF.columns))
print(itemEsFeatureDF.head(10))
print("item es 特征工程") if __name__ == '__main__':
item_es_feature_start_time = int(round(time.time()))
itemEsFeatureDF = itemEsFeaturesProcess(itemEsFeatureDF)
item_es_feature_end_time = int(round(time.time()))
print("item es 特征工程, 耗时: {}s".format(item_es_feature_end_time - item_es_feature_start_time))
itemEsFeatureDF = spark.createDataFrame(itemEsFeatureDF) start = time.time()
itemEsFeatureDF.printSchema() #入参
itemEsFeatureDF.show(10, truncate=False) trainDays = int(sys.argv[1])
itemStatisticStartDays = int(sys.argv[2])
print('trainDays:{}'.format(trainDays),flush=True)
spark = get_spark("SERVICE_FEATURE_CSV_EXPORT_SK")
spark.sparkContext.setLogLevel("ERROR")
#获取点击曝光数据
clickDF, expDF, ratingDF, startDay, endDay = get_click_exp_rating_df(trainDays, spark)
#item Es Feature
itemEsFeatureDF = get_item_es_feature_df()
#item Es Feature Process
itemEsFeatureDF = itemEsFeaturesProcess(itemEsFeatureDF, spark)
#计算 item 统计特征
clickStaticFeatures, expStaticFeatures = getItemStaticFeatures(itemStatisticStartDays + trainDays, startDay, endDay) clickStaticFeatures, expStaticFeatures = getItemStaticFeatures(itemStatisticStartDays + trainDays, startDay, endDay)
# item统计特征处理 #样本添加 item es feature 和 item 统计 特征
samples_iEsF_iStatisticF = addItemFeatures(ratingDF, itemEsFeatureDF, clickStaticFeatures, expStaticFeatures) samples_iEsF_iStatisticF = ratingDF.join(clickStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
samples_iEsF_iStatisticF.show(50, truncate=False) .join(expStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
.join(itemEsFeatureDF, on = ["item_id"], how = 'left')
samples_iEsF_iStatisticF = itemStatisticFeaturesProcess(samples_iEsF_iStatisticF)
sys.exit(1) sys.exit(1)
# 统计数据处理 # 统计数据处理
# ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab) # ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment