Commit ffa71992 authored by 宋柯's avatar 宋柯

模型上线

parent ef5bbbd0
...@@ -1046,6 +1046,87 @@ def get_click_exp_start_end_time(trainDays): ...@@ -1046,6 +1046,87 @@ def get_click_exp_start_end_time(trainDays):
print("click_exp_start_end_time: {}, {}".format(startDay, endDay), flush=True) print("click_exp_start_end_time: {}, {}".format(startDay, endDay), flush=True)
return startDay, endDay return startDay, endDay
def get_and_save_card_feature(itemEsFeatureDF, predictClickStaticFeatures, predictExpStaticFeatures, fields_na_value_dict):
itemFeature = itemEsFeatureDF.join(predictClickStaticFeatures, on = 'card_id', how = 'left').join(predictExpStaticFeatures, on = 'card_id', how = 'left')
itemFeature = itemFeature.na.fill(fields_na_value_dict)
itemFeature.printSchema()
itemFeatureDF = itemFeature.toPandas()
conn = getRedisConn()
BATCH = 5000
Key = 'strategy:model:rank:widedeep:service:feature'
def concat_service_feature(row):
row = row[1]
return '|'.join(row[columns_used].astype(str))
columns_used = list(filter(lambda c: c.startswith('ITEM_'), list(itemFeatureDF.columns)))
print('columns_used: ', columns_used)
for start in range(0, len(itemFeatureDF), BATCH):
conn.hmset(Key, {row[1]['card_id']: concat_service_feature(row) for row in itemFeatureDF.iloc[start: start + BATCH].iterrows()})
def get_and_save_device_feature(spark, fields_na_value_dict, days = 180):
start = addDays(-1)
end = addDays(-days)
sql = """
SELECT t1.cl_id device_id, cl_type as os
from
( --新首页卡片曝光
SELECT cl_type,cl_id
FROM online.ml_community_precise_exposure_detail
where partition_date>='{startDay}' and partition_date<='{endDay}'
and action in ('page_precise_exposure','home_choiceness_card_exposure')
and cl_id IS NOT NULL
and card_id IS NOT NULL
and cl_type IS NOT NULL
and cl_type <> ''
and is_exposure = '1'
--and page_name='home'
--and tab_name='精选'
--and page_name in ('home','search_result_more')
--and ((page_name='home' and tab_name='精选') or (page_name='category' and tab_name = '商品'))
and card_type in ('card','video')
and (get_json_object(exposure_card,'$.in_page_pos') is null or get_json_object(exposure_card,'$.in_page_pos') != 'seckill')
group by cl_type, cl_id
) t1
join
( --渠道,新老
SELECT distinct device_id
FROM online.ml_device_day_active_status
where partition_date>='{startDay}' and partition_date<='{endDay}'
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion\_jf\_%'
) t2
on t1.cl_id = t2.device_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
""".format(startDay=start,endDay=end)
print(sql)
device_feature_df = spark.sql(sql).toPandas()
device_feature_df = device_feature_df.na.fill(fields_na_value_dict)
conn = getRedisConn()
BATCH = 5000
Key = 'strategy:model:rank:widedeep:device:feature'
for start in range(0, len(device_feature_df), BATCH):
conn.hmset(Key, {row[1]['device_id']: row[1]['os'] for row in device_feature_df.iloc[start: start + BATCH].iterrows()})
def get_click_exp_rating_df(trainDays, spark): def get_click_exp_rating_df(trainDays, spark):
#行为数据的开始结束日期 #行为数据的开始结束日期
startDay, endDay = get_click_exp_start_end_time(trainDays) startDay, endDay = get_click_exp_start_end_time(trainDays)
...@@ -1183,6 +1264,40 @@ if __name__ == '__main__': ...@@ -1183,6 +1264,40 @@ if __name__ == '__main__':
samples.printSchema() samples.printSchema()
# root
# | -- ITEM_CATEGORY_card_id: string(nullable=false)
# | -- partition_date: string(nullable=true)
# | -- USER_CATEGORY_device_id: string(nullable=false)
# | -- USER_CATEGORY_os: string(nullable=false)
# | -- USER_CATEGORY_user_city_id: string(nullable=false)
# | -- label: integer(nullable=false)
# | -- USER_MULTI_CATEGORY_second_solutions: string(nullable=false)
# | -- USER_MULTI_CATEGORY_second_demands: string(nullable=false)
# | -- USER_MULTI_CATEGORY_second_positions: string(nullable=false)
# | -- USER_MULTI_CATEGORY_projects: string(nullable=false)
# | -- ITEM_NUMERIC_click_count_sum: double(nullable=false)
# | -- ITEM_NUMERIC_click_count_avg: double(nullable=false)
# | -- ITEM_NUMERIC_click_count_stddev: double(nullable=false)
# | -- ITEM_NUMERIC_exp_count_sum: double(nullable=false)
# | -- ITEM_NUMERIC_exp_count_avg: double(nullable=false)
# | -- ITEM_NUMERIC_exp_count_stddev: double(nullable=false)
# | -- ITEM_NUMERIC_discount: double(nullable=false)
# | -- ITEM_NUMERIC_case_count: long(nullable=false)
# | -- ITEM_NUMERIC_sales_count: long(nullable=false)
# | -- ITEM_CATEGORY_service_type: string(nullable=false)
# | -- ITEM_CATEGORY_merchant_id: string(nullable=false)
# | -- ITEM_CATEGORY_doctor_type: string(nullable=false)
# | -- ITEM_CATEGORY_doctor_id: string(nullable=false)
# | -- ITEM_CATEGORY_doctor_famous: string(nullable=false)
# | -- ITEM_CATEGORY_hospital_id: string(nullable=false)
# | -- ITEM_CATEGORY_hospital_city_tag_id: string(nullable=false)
# | -- ITEM_CATEGORY_hospital_type: string(nullable=false)
# | -- ITEM_CATEGORY_hospital_is_high_quality: string(nullable=false)
# | -- ITEM_MULTI_CATEGORY_second_demands: string(nullable=false)
# | -- ITEM_MULTI_CATEGORY_second_solutions: string(nullable=false)
# | -- ITEM_MULTI_CATEGORY_second_positions: string(nullable=false)
# | -- ITEM_MULTI_CATEGORY_projects: string(nullable=false)
# | -- ITEM_NUMERIC_sku_price: double(nullable=false)
test_samples = samples.where("partition_date = '{}'".format(endDay)) test_samples = samples.where("partition_date = '{}'".format(endDay))
train_samples = samples.where("partition_date <> '{}'".format(endDay)) train_samples = samples.where("partition_date <> '{}'".format(endDay))
...@@ -1221,12 +1336,25 @@ if __name__ == '__main__': ...@@ -1221,12 +1336,25 @@ if __name__ == '__main__':
#存入线上预测特征 #存入线上预测特征
# card_id | ITEM_NUMERIC_click_count_sum | ITEM_NUMERIC_click_count_avg | ITEM_NUMERIC_click_count_stddev # card_id | ITEM_NUMERIC_click_count_sum | ITEM_NUMERIC_click_count_avg | ITEM_NUMERIC_click_count_stddev
predictClickStaticDF = predictClickStaticFeatures.toPandas() # predictClickStaticDF = predictClickStaticFeatures.toPandas()
# card_id | ITEM_NUMERIC_exp_count_sum | ITEM_NUMERIC_exp_count_avg | ITEM_NUMERIC_exp_count_stddev # card_id | ITEM_NUMERIC_exp_count_sum | ITEM_NUMERIC_exp_count_avg | ITEM_NUMERIC_exp_count_stddev
predictExpStaticDF = predictExpStaticFeatures.toPandas() # predictExpStaticDF = predictExpStaticFeatures.toPandas()
#
#ITEM_CATEGORY_card_id,partition_date,USER_CATEGORY_device_id,USER_CATEGORY_os,USER_CATEGORY_user_city_id,label,
#USER_MULTI_CATEGORY_second_solutions,USER_MULTI_CATEGORY_second_demands,USER_MULTI_CATEGORY_second_positions,
#USER_MULTI_CATEGORY_projects,
#ITEM_NUMERIC_click_count_sum,ITEM_NUMERIC_click_count_avg,ITEM_NUMERIC_click_count_stddev,
#ITEM_NUMERIC_exp_count_sum,ITEM_NUMERIC_exp_count_avg,ITEM_NUMERIC_exp_count_stddev,ITEM_NUMERIC_discount,ITEM_NUMERIC_case_count,
#ITEM_NUMERIC_sales_count,ITEM_CATEGORY_service_type,ITEM_CATEGORY_merchant_id,ITEM_CATEGORY_doctor_type,
#ITEM_CATEGORY_doctor_id,ITEM_CATEGORY_doctor_famous,ITEM_CATEGORY_hospital_id,ITEM_CATEGORY_hospital_city_tag_id,
#ITEM_CATEGORY_hospital_type,ITEM_CATEGORY_hospital_is_high_quality,ITEM_MULTI_CATEGORY_second_demands,
#ITEM_MULTI_CATEGORY_second_solutions,ITEM_MULTI_CATEGORY_second_positions,ITEM_MULTI_CATEGORY_projects,ITEM_NUMERIC_sku_price
#存device_id -> USER_CATEGORY_os,
get_and_save_device_feature(spark, fields_na_value_dict)
#存card_id -> ITEM*
get_and_save_card_feature(itemEsFeatureDF, predictClickStaticFeatures, predictExpStaticFeatures, fields_na_value_dict)
print("总耗时:{} mins".format((time.time() - start)/60)) print("总耗时:{} mins".format((time.time() - start)/60))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment