Commit 5d416d18 authored by 宋柯's avatar 宋柯

模型调试

parent 7fb19aef
......@@ -112,12 +112,11 @@ def getItemStaticFeatures(itemStatisticDays, startDay, endDay):
return res
itemStatisticDF = itemStatisticDF.rdd.flatMap(splitPatitionDatasFlatMapFunc).toDF(["card_id", "partition_date", "label", "label_count"])
itemStatisticDF.orderBy(['card_id', 'label', 'partition_date'])
itemStatisticDF.createOrReplaceTempView("itemStatisticDF")
itemStatisticSql = """
SELECT
card_id as item_id,
card_id,
label,
partition_date,
label_count,
......@@ -281,21 +280,20 @@ def getUserProfileFeature(spark, startDay, endDay):
userProfileFeatureDF.createOrReplaceTempView("userProfileFeatureDF")
table_query = """
select date as dt, cl_id as device_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions, projects
select date as dt, cl_id as device_id, second_solutions, second_demands, second_positions, projects
from userProfileFeatureDF
where date >= '{startDay}' and date <= '{endDay}'
""".format(startDay = startDay, endDay = endDay)
print(table_query)
userProfileFeatureDF = spark.sql(table_query)
userProfileFeatureDF.cache()
userProfileFeatureDF.show(20, False)
def addOneDay(dt):
return (date.fromisoformat(dt) + timedelta(days = 1)).strftime('%Y%m%d')
addOneDay_UDF = F.udf(addOneDay, StringType())
userProfileFeatureDF = userProfileFeatureDF.withColumn('partition_date', addOneDay_UDF('dt')).drop('dt')
userProfileFeatureDF.cache()
userProfileFeatureDF.show(20, False)
return userProfileFeatureDF
......@@ -833,9 +831,9 @@ def get_item_es_feature_df():
_source = res['_source']
data = parseSource(_source)
datas.append(data)
print("item size:",len(datas))
print("card size:",len(datas))
itemColumns = [ITEM_PREFIX + CATEGORY_PREFIX + 'id',ITEM_PREFIX + NUMERIC_PREFIX + 'discount',
itemColumns = ['card_id', ITEM_PREFIX + NUMERIC_PREFIX + 'discount',
ITEM_PREFIX + NUMERIC_PREFIX + 'case_count', ITEM_PREFIX + NUMERIC_PREFIX + 'sales_count',
ITEM_PREFIX + CATEGORY_PREFIX + 'service_type',ITEM_PREFIX + CATEGORY_PREFIX + 'merchant_id',
ITEM_PREFIX + CATEGORY_PREFIX + 'doctor_type', ITEM_PREFIX + CATEGORY_PREFIX + 'doctor_id',
......@@ -843,8 +841,8 @@ def get_item_es_feature_df():
ITEM_PREFIX + CATEGORY_PREFIX + 'hospital_city_tag_id', ITEM_PREFIX + CATEGORY_PREFIX + 'hospital_type',
ITEM_PREFIX + CATEGORY_PREFIX + 'hospital_is_high_quality', ITEM_PREFIX + CATEGORY_PREFIX + 'second_demands',
ITEM_PREFIX + CATEGORY_PREFIX + 'second_solutions', ITEM_PREFIX + CATEGORY_PREFIX + 'second_positions',
ITEM_PREFIX + CATEGORY_PREFIX + 'tags_v3', ITEM_PREFIX + NUMERIC_PREFIX + 'sku_price']
# 'sku_tags','sku_show_tags','sku_price']
ITEM_PREFIX + CATEGORY_PREFIX + 'projects', ITEM_PREFIX + NUMERIC_PREFIX + 'sku_price']
itemEsFeatureDF = pd.DataFrame(datas,columns=itemColumns)
itemEsFeatureDF = spark.createDataFrame(itemEsFeatureDF)
......@@ -909,16 +907,12 @@ def get_click_exp_rating_df(trainDays, spark):
expDF = expDF.withColumn("label", F.lit(0))
ratingDF = clickDF.union(expDF)
ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\
.withColumnRenamed("device_id", "userid")\
.withColumnRenamed("card_id", "item_id")\
.withColumnRenamed("page_stay", "rating")\
.withColumnRenamed("os", "user_os")\
.withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\
.withColumn("timestamp",F.col("timestamp").cast("long"))
ratingDF.cache()
print("ratingDF.columns: {}".format(ratingDF.columns))
print(ratingDF.show(100, truncate=False))
print(ratingDF.show(20, truncate=False))
expDF.unpersist(True)
clickDF.unpersist(True)
......@@ -935,7 +929,6 @@ if __name__ == '__main__':
spark = get_spark("SERVICE_FEATURE_CSV_EXPORT_SK")
spark.sparkContext.setLogLevel("ERROR")
userProfileFeatureDF = getUserProfileFeature(spark, addDays(-trainDays - 1, format = "%Y-%m-%d"), addDays(-1, format = "%Y-%m-%d"))
#获取点击曝光数据
clickDF, expDF, ratingDF, startDay, endDay = get_click_exp_rating_df(trainDays, spark)
......@@ -945,16 +938,16 @@ if __name__ == '__main__':
#计算 item 统计特征
clickStaticFeatures, expStaticFeatures = getItemStaticFeatures(itemStatisticStartDays + trainDays, startDay, endDay)
#样本添加 item es feature 和 item 统计 特征
samples_iEsF_iStatisticF = ratingDF.join(clickStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
.join(expStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
.join(itemEsFeatureDF, on = ["item_id"], how = 'left')
#user profile feature
samplesWithUserFeatures = getUserProfileFeature(samples_iEsF_iStatisticF, spark)
#user Profile Feature
userProfileFeatureDF = getUserProfileFeature(spark, addDays(-trainDays - 1, format = "%Y-%m-%d"), addDays(-1, format = "%Y-%m-%d"))
#样本添加 item es feature 和 item 统计 特征
samples = ratingDF.join(userProfileFeatureDF, on = ['device_id', "partition_date"], how = 'left')\
.join(clickStaticFeatures, on = ["card_id", "partition_date"], how = 'left')\
.join(expStaticFeatures, on = ["card_id", "partition_date"], how = 'left')\
.join(itemEsFeatureDF, on = ["card_id"], how = 'left')
samples.show(20, False)
sys.exit()
# user columns
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment