Commit eb269e5e authored by 宋柯's avatar 宋柯

模型bug修复

parent fa8c704c
...@@ -928,7 +928,7 @@ def addDays(n, format="%Y%m%d"): ...@@ -928,7 +928,7 @@ def addDays(n, format="%Y%m%d"):
return (date.today() + timedelta(days=n)).strftime(format) return (date.today() + timedelta(days=n)).strftime(format)
def generatePartitionDates(trainDays): def generatePartitionDates(trainDays):
return [(date.today() + timedelta(days = -trainDay)).strftime(format) for trainDay in range(trainDays)] return [addDays(-trainDay) for trainDay in range(trainDays)]
#显示所有列 #显示所有列
pd.set_option('display.max_columns', None) pd.set_option('display.max_columns', None)
...@@ -970,8 +970,19 @@ if __name__ == '__main__': ...@@ -970,8 +970,19 @@ if __name__ == '__main__':
partitionDatas = generatePartitionDates(trainDays) partitionDatas = generatePartitionDates(trainDays)
partitionDatasBC = spark.sparkContext.broadcast(partitionDatas) partitionDatasBC = spark.sparkContext.broadcast(partitionDatas)
itemStatisticDF.rdd.flatMap()
def splitPatitionDatasFlatMapFunc(row):
card_id = row.card_id
label = row.label
partition_date_label_count_list = row.partition_date_label_count_list
partition_date_label_count_dcit = dict(map(lambda s: (s.split('_')[0], s.split('_')[1]), partition_date_label_count_list))
res = []
for partition_date in partitionDatasBC.value:
res.append((card_id, partition_date, label, partition_date_label_count_dcit.get(partition_date, 0)))
return res
itemStatisticDF = itemStatisticDF.rdd.flatMap(splitPatitionDatasFlatMapFunc).toDF("card_id", "partition_date", "label", "label_count")
itemStatisticDF.show(1000, False)
sys.exit(1) sys.exit(1)
clickDF = spark.sql(clickSql) clickDF = spark.sql(clickSql)
clickDF.createOrReplaceTempView("clickDF") clickDF.createOrReplaceTempView("clickDF")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment