Commit 8ff8d21d authored by 宋柯's avatar 宋柯

模型上线

parent e0f282db
......@@ -146,6 +146,49 @@ def getItemStaticFeatures(itemStatisticDays, startDay, endDay):
return clickStaticFeatures, expStaticFeatures
def getPredictItemStaticFeatures(itemStatisticDays):
itemStatisticStartDay = addDays(-itemStatisticDays)
endDay = addDays(-1)
itemStatisticSql = getPredictItemStatisticSql(itemStatisticStartDay, endDay)
itemStatisticDF = spark.sql(itemStatisticSql)
# itemStatisticDF.show(100, False)
itemStatisticDF.createOrReplaceTempView("predictItemStatisticDF")
itemStatisticSql = """
SELECT
card_id,
label,
COALESCE(SUM(label_count), 0) label_count_sum,
COALESCE(AVG(label_count), 0) label_count_avg,
COALESCE(STDDEV_POP(label_count), 0) label_count_stddev
FROM
predictItemStatisticDF
GROUP BY
card_id, label
"""
print("predictItemStatisticSql: {}".format(itemStatisticSql))
staticFeatures = spark.sql(itemStatisticSql)
clickStaticFeatures = staticFeatures.where(F.col('label') == F.lit(1))\
.withColumnRenamed('label_count_sum', ITEM_PREFIX + NUMERIC_PREFIX + 'click_count_sum')\
.withColumnRenamed('label_count_avg', ITEM_PREFIX + NUMERIC_PREFIX + 'click_count_avg')\
.withColumnRenamed('label_count_stddev', ITEM_PREFIX + NUMERIC_PREFIX + 'click_count_stddev')
expStaticFeatures = staticFeatures.where(F.col('label') == F.lit(0))\
.withColumnRenamed('label_count_sum', ITEM_PREFIX + NUMERIC_PREFIX + 'exp_count_sum')\
.withColumnRenamed('label_count_avg', ITEM_PREFIX + NUMERIC_PREFIX + 'exp_count_avg')\
.withColumnRenamed('label_count_stddev', ITEM_PREFIX + NUMERIC_PREFIX + 'exp_count_stddev')
drop_columns = ['label']
clickStaticFeatures = clickStaticFeatures.drop(*drop_columns)
# clickStaticFeatures.show(20, truncate = False)
expStaticFeatures = expStaticFeatures.drop(*drop_columns)
# expStaticFeatures.show(20, truncate = False)
return clickStaticFeatures, expStaticFeatures
# ratingDF, itemEsFeatureDF, startDay, endDay
def itemStatisticFeaturesProcess(samples_iEsF_iStatisticF):
......@@ -716,6 +759,106 @@ def getItemStatisticSql(start, end):
print(sql)
return sql
def getPredictItemStatisticSql(start, end):
sql = """
SELECT TT.card_id, TT.label, count(1) as label_count
FROM
(
SELECT T.partition_date, T.card_id, T.label
FROM
(
SELECT t1.card_id, 1 as label
FROM
(
select cl_id, business_id as card_id
from online.bl_hdfs_maidian_updates
where action = 'page_view'
AND partition_date>='{startDay}' and partition_date<='{endDay}'
AND page_name='welfare_detail'
AND page_stay >= 2
AND cl_id is not null
AND cl_id != ''
AND business_id is not null
AND business_id != ''
group by partition_date, city_id, cl_id, business_id, cl_type
) AS t1
join
( --渠道,新老
SELECT distinct device_id
FROM online.ml_device_day_active_status
where partition_date>='{startDay}' and partition_date<='{endDay}'
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion\_jf\_%'
) t2
on t1.cl_id = t2.device_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
UNION
SELECT t1.card_id, 0 as label
from
( --新首页卡片曝光
SELECT cl_id, card_id
FROM online.ml_community_precise_exposure_detail
where partition_date>='{startDay}' and partition_date<='{endDay}'
and action in ('page_precise_exposure','home_choiceness_card_exposure')
and cl_id IS NOT NULL
and card_id IS NOT NULL
and is_exposure='1'
--and page_name='home'
--and tab_name='精选'
--and page_name in ('home','search_result_more')
--and ((page_name='home' and tab_name='精选') or (page_name='category' and tab_name = '商品'))
and card_type in ('card','video')
and card_content_type in ('service')
and (get_json_object(exposure_card,'$.in_page_pos') is null or get_json_object(exposure_card,'$.in_page_pos') != 'seckill')
group by partition_date, city_id, cl_type, cl_id, card_id, app_session_id
) t1
join
( --渠道,新老
SELECT distinct device_id
FROM online.ml_device_day_active_status
where partition_date>='{startDay}' and partition_date<='{endDay}'
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion\_jf\_%'
) t2
on t1.cl_id = t2.device_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
) T
) TT
GROUP BY TT.card_id, TT.label
""".format(startDay = start,endDay = end)
print(sql)
return sql
def connectDoris(spark, table):
return spark.read \
.format("jdbc") \
......@@ -945,6 +1088,12 @@ if __name__ == '__main__':
#计算 item 统计特征
clickStaticFeatures, expStaticFeatures = getItemStaticFeatures(itemStatisticStartDays + trainDays + 1, startDay, endDay)
#计算线上推理 item 统计特征
predictClickStaticFeatures, predictExpStaticFeatures = getPredictItemStaticFeatures(itemStatisticStartDays)
predictClickStaticFeatures.show(100, False)
predictExpStaticFeatures.show(100, False)
#user Profile Feature
userProfileFeatureDF = getUserProfileFeature(spark, addDays(-trainDays - 1, format = "%Y-%m-%d"), addDays(-1, format = "%Y-%m-%d"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment