Commit b0896276 authored by 宋柯's avatar 宋柯

模型上线

parent 62c71856
...@@ -153,6 +153,20 @@ def getPredictItemStaticFeatures(itemStatisticDays): ...@@ -153,6 +153,20 @@ def getPredictItemStaticFeatures(itemStatisticDays):
itemStatisticDF = spark.sql(itemStatisticSql) itemStatisticDF = spark.sql(itemStatisticSql)
# itemStatisticDF.show(100, False) # itemStatisticDF.show(100, False)
partitionDatas = generatePredictPartitionDates(itemStatisticDays)
partitionDatasBC = spark.sparkContext.broadcast(partitionDatas)
def splitPatitionDatasFlatMapFunc(row):
card_id = row.card_id
label = row.label
partition_date_label_count_list = row.partition_date_label_count_list
partition_date_label_count_dcit = dict(map(lambda s: (s.split('_')[0], s.split('_')[1]), partition_date_label_count_list))
res = []
for partition_date in partitionDatasBC.value:
res.append((card_id, partition_date, label, partition_date_label_count_dcit.get(partition_date, '0')))
return res
itemStatisticDF = itemStatisticDF.rdd.flatMap(splitPatitionDatasFlatMapFunc).toDF(["card_id", "partition_date", "label", "label_count"])
itemStatisticDF.createOrReplaceTempView("predictItemStatisticDF") itemStatisticDF.createOrReplaceTempView("predictItemStatisticDF")
...@@ -761,13 +775,16 @@ def getItemStatisticSql(start, end): ...@@ -761,13 +775,16 @@ def getItemStatisticSql(start, end):
def getPredictItemStatisticSql(start, end): def getPredictItemStatisticSql(start, end):
sql = """ sql = """
SELECT TT.card_id, TT.label, count(1) as label_count SELECT TTT.card_id, TTT.label, COLLECT_LIST(CONCAT(TTT.partition_date, '_', TTT.label_count)) partition_date_label_count_list
FROM
(
SELECT TT.partition_date, TT.card_id, TT.label, count(1) as label_count
FROM FROM
( (
SELECT T.card_id, T.label SELECT T.partition_date, T.card_id, T.label
FROM FROM
( (
SELECT t1.card_id, 1 as label SELECT t1.partition_date, t1.card_id, 1 as label
FROM FROM
( (
select cl_id, business_id as card_id select cl_id, business_id as card_id
...@@ -808,7 +825,7 @@ def getPredictItemStatisticSql(start, end): ...@@ -808,7 +825,7 @@ def getPredictItemStatisticSql(start, end):
on t3.device_id=t2.device_id on t3.device_id=t2.device_id
WHERE t3.device_id is null WHERE t3.device_id is null
UNION UNION
SELECT t1.card_id, 0 as label SELECT t1.partition_date, t1.card_id, 0 as label
from from
( --新首页卡片曝光 ( --新首页卡片曝光
SELECT cl_id, card_id SELECT cl_id, card_id
...@@ -854,7 +871,10 @@ def getPredictItemStatisticSql(start, end): ...@@ -854,7 +871,10 @@ def getPredictItemStatisticSql(start, end):
WHERE t3.device_id is null WHERE t3.device_id is null
) T ) T
) TT ) TT
GROUP BY TT.card_id, TT.label GROUP BY TT.partition_date, TT.card_id, TT.label
) TTT
GROUP BY TTT.card_id, TTT.label
""".format(startDay = start,endDay = end) """.format(startDay = start,endDay = end)
print(sql) print(sql)
return sql return sql
...@@ -1010,6 +1030,9 @@ def addDays(n, format="%Y%m%d"): ...@@ -1010,6 +1030,9 @@ def addDays(n, format="%Y%m%d"):
def generatePartitionDates(partitionDates): def generatePartitionDates(partitionDates):
return [addDays(-trainDay - 1) for trainDay in range(partitionDates)] return [addDays(-trainDay - 1) for trainDay in range(partitionDates)]
def generatePredictPartitionDates(partitionDates):
return [addDays(-trainDay) for trainDay in range(partitionDates)]
#显示所有列 #显示所有列
pd.set_option('display.max_columns', None) pd.set_option('display.max_columns', None)
#显示所有行 #显示所有行
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment