Commit 4a3f3fae authored by 宋柯's avatar 宋柯

模型bug修复

parent ebd503b3
...@@ -58,10 +58,17 @@ def parseTags(tags,i): ...@@ -58,10 +58,17 @@ def parseTags(tags,i):
else: else:
return "-1" return "-1"
def parseTagsFromArray(tagsArray,i):
if len(tagsArray) >= i:
return tagsArray[i - 1]
else:
return "-1"
def numberToBucket(num): def numberToBucket(num):
res = 0 res = 0
if not num: if not num:
return str(res) return str(res)
num = int(num)
if num >= 1000: if num >= 1000:
res = 1000//10 res = 1000//10
else: else:
...@@ -82,48 +89,83 @@ def priceToBucket(num): ...@@ -82,48 +89,83 @@ def priceToBucket(num):
numberToBucketUdf = F.udf(numberToBucket, StringType()) numberToBucketUdf = F.udf(numberToBucket, StringType())
priceToBucketUdf = F.udf(priceToBucket, StringType()) priceToBucketUdf = F.udf(priceToBucket, StringType())
def addItemStaticFeatures(samples,itemDF,dataVocab): def getItemStaticFeatures(itemStatisticDays, startDay, endDay):
ctrUdf = F.udf(wilson_ctr, FloatType()) itemStatisticStartDay = addDays(-itemStatisticDays)
# item不设置over窗口,原因:item可能一直存在,统计数据按照最新即可 itemStatisticSql = getItemStatisticSql(itemStatisticStartDay, endDay)
print("item统计特征处理...")
#TODO item特征存在特征穿越 itemStatisticDF = spark.sql(itemStatisticSql)
staticFeatures = samples.groupBy('item_id').agg(F.count(F.lit(1)).alias('itemRatingCount'), # itemStatisticDF.show(100, False)
F.avg(F.col('rating')).alias('itemRatingAvg'),
F.stddev(F.col('rating')).alias('itemRatingStddev'), partitionDatas = generatePartitionDates(itemStatisticDays)
F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).alias("itemClickCount"), partitionDatasBC = spark.sparkContext.broadcast(partitionDatas)
F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).alias("itemExpCount")
).fillna(0) \ def splitPatitionDatasFlatMapFunc(row):
.withColumn('itemRatingStddev', F.format_number(F.col('itemRatingStddev'), NUMBER_PRECISION).cast("float")) \ card_id = row.card_id
.withColumn('itemRatingAvg', F.format_number(F.col('itemRatingAvg'), NUMBER_PRECISION).cast("float")) \ label = row.label
.withColumn('itemCtr',F.format_number(ctrUdf(F.col("itemClickCount"),(F.col("itemExpCount"))), NUMBER_PRECISION).cast("float")) partition_date_label_count_list = row.partition_date_label_count_list
partition_date_label_count_dcit = dict(map(lambda s: (s.split('_')[0], s.split('_')[1]), partition_date_label_count_list))
staticFeatures.show(20, truncate=False) res = []
for partition_date in partitionDatasBC.value:
staticFeatures = itemDF.join(staticFeatures, on=["item_id"], how='left') res.append((card_id, partition_date, label, partition_date_label_count_dcit.get(partition_date, '0')))
return res
itemStatisticDF = itemStatisticDF.rdd.flatMap(splitPatitionDatasFlatMapFunc).toDF(["card_id", "partition_date", "label", "label_count"])
itemStatisticDF.orderBy(['card_id', 'label', 'partition_date'])
itemStatisticDF.createOrReplaceTempView("itemStatisticDF")
itemStatisticSql = """
SELECT
card_id as item_id,
label,
partition_date,
label_count,
COALESCE(SUM(label_count) OVER(PARTITION BY card_id, label ORDER BY partition_date ROWS BETWEEN {itemStatisticStartDays} PRECEDING AND 1 PRECEDING), 0) label_count_sum,
COALESCE(AVG(label_count) OVER(PARTITION BY card_id, label ORDER BY partition_date ROWS BETWEEN {itemStatisticStartDays} PRECEDING AND 1 PRECEDING), 0) label_count_avg,
COALESCE(STDDEV_POP(label_count) OVER(PARTITION BY card_id, label ORDER BY partition_date ROWS BETWEEN {itemStatisticStartDays} PRECEDING AND 1 PRECEDING), 0) label_count_stddev
FROM
itemStatisticDF
WHERE partition_date >= '{startDay}' and partition_date <= '{endDay}'
""".format(itemStatisticStartDays = itemStatisticStartDays, startDay = startDay, endDay = endDay)
print("itemStatisticSql: {}".format(itemStatisticSql))
staticFeatures = spark.sql(itemStatisticSql)
clickStaticFeatures = staticFeatures.where(F.col('label') == F.lit(1))\
.withColumnRenamed('label_count_sum', 'click_count_sum')\
.withColumnRenamed('label_count_avg', 'click_count_avg')\
.withColumnRenamed('label_count_stddev', 'click_count_stddev')
expStaticFeatures = staticFeatures.where(F.col('label') == F.lit(0))\
.withColumnRenamed('label_count_sum', 'exp_count_sum')\
.withColumnRenamed('label_count_avg', 'exp_count_avg')\
.withColumnRenamed('label_count_stddev', 'exp_count_stddev')
clickStaticFeatures.show(20, truncate=False)
expStaticFeatures.show(20, truncate=False)
return clickStaticFeatures, expStaticFeatures
# ratingDF, itemEsFeatureDF, startDay, endDay
def addItemFeatures(samples, itemEsFeatureDF, clickStaticFeatures, expStaticFeatures):
samples_iEsF_iStatisticF = samples.join(clickStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
.join(expStaticFeatures, on = ["item_id", "partition_date"], how = 'left')\
.join(itemEsFeatureDF, on = ["item_id"], how = 'left')
# 连续特征分桶 # 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket" bucket_suffix = "_Bucket"
for col in ["itemRatingCount","itemRatingAvg", "itemClickCount", "itemExpCount"]: for col in ["click_count_sum", "click_count_avg", "exp_count_sum", "exp_count_avg"]:
new_col = col + bucket_suffix new_col = col + bucket_suffix
staticFeatures = staticFeatures.withColumn(new_col, numberToBucketUdf(F.col(col))) \ samples_iEsF_iStatisticF = samples_iEsF_iStatisticF.withColumn(new_col, numberToBucketUdf(F.col(col))) \
.drop(col) \ .drop(col) \
.withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col))) .withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col)))
dataVocab[new_col] = bucket_vocab
# 方差处理 # 方差处理
number_suffix = "_number" number_suffix = "_number"
for col in ["itemRatingStddev"]: for col in ["click_count_stddev", "exp_count_stddev"]:
new_col = col + number_suffix
staticFeatures = staticFeatures.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
for col in ["itemCtr"]:
new_col = col + number_suffix new_col = col + number_suffix
staticFeatures = staticFeatures.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col,new_col) samples_iEsF_iStatisticF = samples_iEsF_iStatisticF.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
print("item size:", staticFeatures.count())
staticFeatures.show(5, truncate=False) return samples_iEsF_iStatisticF
return staticFeatures
def addUserStaticsFeatures(samples,dataVocab): def addUserStaticsFeatures(samples,dataVocab):
print("user统计特征处理...") print("user统计特征处理...")
...@@ -171,46 +213,37 @@ def flatten(items): ...@@ -171,46 +213,37 @@ def flatten(items):
else: else:
yield x yield x
def addItemFeatures(itemDF,dataVocab,multi_col_vocab): def itemEsFeaturesProcess(itemDF):
# multi_col = ['sku_tags', 'sku_show_tags','second_demands', 'second_solutions', 'second_positions'] onehot_cols = ['id', 'service_type', 'merchant_id', 'doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type', 'hospital_is_high_quality']
multi_col = ['tags_v3','second_demands', 'second_solutions', 'second_positions'] multi_cols = ['tags_v3', 'second_demands', 'second_solutions', 'second_positions']
onehot_col = ['id','service_type', 'merchant_id','doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type','hospital_is_high_quality']
for col in onehot_col: for onehot_col in onehot_cols:
new_c = ITEM_PREFIX + col itemDF[ITEM_PREFIX + onehot_col] = itemDF[onehot_col]
dataVocab[new_c] = list(set(itemDF[col].tolist())) itemDF = itemDF.drop(columns = onehot_cols)
itemDF[new_c] = itemDF[col]
itemDF = itemDF.drop(columns=onehot_col)
for c in multi_col: for multi_col in multi_cols:
#TODO 这里多标签的应该拆开 #TODO 这里多标签的应该拆开
multi_col_vocab[c] = list(set(flatten(map(lambda x: x.split(','), itemDF[c].tolist())))) # multi_col_unique = list(set(flatten(map(lambda x: x.split(','), itemDF[multi_col].tolist()))))
itemDF[multi_col] = itemDF[multi_col].map(lambda x: x.split(","))
for i in range(1, 6): for idx in range(1, 6):
new_c = ITEM_PREFIX + c + "__" + str(i) itemDF[ITEM_PREFIX + multi_col + "__" + str(idx)] = itemDF[multi_col].map(lambda tagArray: parseTagsFromArray(tagArray, idx))
itemDF[new_c] = itemDF[c].map(lambda x:parseTags(x,i)) itemDF = itemDF.drop(columns = multi_cols)
dataVocab[new_c] = multi_col_vocab[c]
# 连续特征分桶 # 连续特征分桶
bucket_vocab = [str(i) for i in range(101)] # bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket" bucket_suffix = "_Bucket"
for col in ['case_count', 'sales_count']: for col in ['case_count', 'sales_count']:
new_col = ITEM_PREFIX + col + bucket_suffix itemDF[ITEM_PREFIX + col + bucket_suffix] = itemDF[col].map(numberToBucket)
itemDF[new_col] = itemDF[col].map(numberToBucket) itemDF = itemDF.drop(columns = [col])
itemDF = itemDF.drop(columns=[col])
dataVocab[new_col] = bucket_vocab
for col in ['sku_price']: for col in ['sku_price']:
new_col = ITEM_PREFIX + col + bucket_suffix itemDF[ITEM_PREFIX + col + bucket_suffix] = itemDF[col].map(priceToBucket)
itemDF[new_col] = itemDF[col].map(priceToBucket) itemDF = itemDF.drop(columns = [col])
itemDF = itemDF.drop(columns=[col])
dataVocab[new_col] = bucket_vocab
# 连续数据处理 # 连续数据处理
number_suffix = "_number" number_suffix = "_number"
for col in ["discount"]: for col in ["discount"]:
new_col = ITEM_PREFIX + col + number_suffix itemDF[ITEM_PREFIX + col + number_suffix] = itemDF[col]
itemDF[new_col] = itemDF[col]
itemDF = itemDF.drop(columns=[col]) itemDF = itemDF.drop(columns=[col])
return itemDF return itemDF
...@@ -656,137 +689,7 @@ def getItemStatisticSql(start, end): ...@@ -656,137 +689,7 @@ def getItemStatisticSql(start, end):
GROUP BY TT.card_id, TT.partition_date, TT.label GROUP BY TT.card_id, TT.partition_date, TT.label
) TTT ) TTT
GROUP BY TTT.card_id, TTT.label GROUP BY TTT.card_id, TTT.label
""".format(startDay=start,endDay=end) """.format(startDay = start,endDay = end)
print(sql)
return sql
def getClickSql2(start, end):
sql = """
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.business_id card_id,t1.time_stamp time_stamp,t1.page_stay as page_stay
FROM
(select partition_date,cl_id,business_id,action,page_name,page_stay,time_stamp,page_stay
from online.bl_hdfs_maidian_updates
where action = 'page_view'
AND partition_date BETWEEN '{}' AND '{}'
AND page_name='welfare_detail'
AND page_stay>=1
AND cl_id is not null
AND cl_id != ''
AND business_id is not null
AND business_id != ''
AND business_id rlike '{}'
) AS t1
JOIN
(select partition_date,active_type,first_channel_source_type,device_id
from online.ml_device_day_active_status
where partition_date BETWEEN '{}' AND '{}'
AND active_type IN ('1', '2', '4')
AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
AND first_channel_source_type not LIKE 'promotion\\_jf\\_%') as t2
ON t1.cl_id = t2.device_id
AND t1.partition_date = t2.partition_date
LEFT JOIN
(
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY = regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)dev
on t1.cl_id=dev.device_id
WHERE dev.device_id is null
""".format(start, end, ACTION_REG, start, end)
print(sql)
return sql
def getExposureSql2(start, end):
sql = """
SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay
FROM
(SELECT partition_date,cl_id,card_id,time_stamp
FROM online.ml_community_precise_exposure_detail
WHERE cl_id IS NOT NULL
AND card_id IS NOT NULL
AND card_id rlike '{}'
AND action='page_precise_exposure'
AND card_content_type = '{}'
AND is_exposure = 1 ) AS t1
LEFT JOIN online.ml_device_day_active_status AS t2 ON t1.cl_id = t2.device_id
AND t1.partition_date = t2.partition_date
LEFT JOIN
( SELECT DISTINCT device_id
FROM ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
WHERE PARTITION_DAY = regexp_replace(DATE_SUB(CURRENT_DATE,1),'-','')
AND is_abnormal_device = 'true' )dev
ON t1.cl_id=dev.device_id
WHERE dev.device_id IS NULL
AND t2.partition_date BETWEEN '{}' AND '{}'
AND t2.active_type IN ('1',
'2',
'4')
AND t2.first_channel_source_type NOT IN ('yqxiu1',
'yqxiu2',
'yqxiu3',
'yqxiu4',
'yqxiu5',
'mxyc1',
'mxyc2',
'mxyc3' ,
'wanpu',
'jinshan',
'jx',
'maimai',
'zhuoyi',
'huatian',
'suopingjingling',
'mocha',
'mizhe',
'meika',
'lamabang' ,
'js-az1',
'js-az2',
'js-az3',
'js-az4',
'js-az5',
'jfq-az1',
'jfq-az2',
'jfq-az3',
'jfq-az4',
'jfq-az5',
'toufang1' ,
'toufang2',
'toufang3',
'toufang4',
'toufang5',
'toufang6',
'TF-toufang1',
'TF-toufang2',
'TF-toufang3',
'TF-toufang4' ,
'TF-toufang5',
'tf-toufang1',
'tf-toufang2',
'tf-toufang3',
'tf-toufang4',
'tf-toufang5',
'benzhan',
'promotion_aso100' ,
'promotion_qianka',
'promotion_xiaoyu',
'promotion_dianru',
'promotion_malioaso',
'promotion_malioaso-shequ' ,
'promotion_shike',
'promotion_julang_jl03',
'promotion_zuimei')
AND t2.first_channel_source_type NOT LIKE 'promotion\\_jf\\_%'
""".format(ACTION_REG, CONTENT_TYPE, start, end)
print(sql) print(sql)
return sql return sql
...@@ -945,80 +848,26 @@ if __name__ == '__main__': ...@@ -945,80 +848,26 @@ if __name__ == '__main__':
itemStatisticStartDays = int(sys.argv[2]) itemStatisticStartDays = int(sys.argv[2])
print('trainDays:{}'.format(trainDays),flush=True) print('trainDays:{}'.format(trainDays),flush=True)
#行为数据的开始结束日期
endDay = addDays(0) endDay = addDays(0)
startDay = addDays(-int(trainDays)) startDay = addDays(-int(trainDays))
itemStatisticStartDay = addDays(-int(trainDays + itemStatisticStartDays))
print("train_data start:{} end:{}".format(startDay,endDay))
#item特征统计行为数据的开始结束日期
spark = get_spark("service_feature_csv_export") spark = get_spark("SERVICE_FEATURE_CSV_EXPORT_SK")
spark.sparkContext.setLogLevel("ERROR") spark.sparkContext.setLogLevel("ERROR")
# itemDF = get_service_feature_df() #获取行为数据
# print(itemDF.columns)
# print(itemDF.head(100))
# 行为数据
clickSql = getClickSql(startDay,endDay) clickSql = getClickSql(startDay,endDay)
expSql = getExposureSql(startDay,endDay) expSql = getExposureSql(startDay,endDay)
itemStatisticSql = getItemStatisticSql(itemStatisticStartDay, endDay)
itemStatisticDF = spark.sql(itemStatisticSql)
# itemStatisticDF.show(100, False)
partitionDatas = generatePartitionDates(trainDays + itemStatisticStartDays)
partitionDatasBC = spark.sparkContext.broadcast(partitionDatas)
def splitPatitionDatasFlatMapFunc(row):
card_id = row.card_id
label = row.label
partition_date_label_count_list = row.partition_date_label_count_list
partition_date_label_count_dcit = dict(map(lambda s: (s.split('_')[0], s.split('_')[1]), partition_date_label_count_list))
res = []
for partition_date in partitionDatasBC.value:
res.append((card_id, partition_date, label, partition_date_label_count_dcit.get(partition_date, '0')))
return res
itemStatisticDF = itemStatisticDF.rdd.flatMap(splitPatitionDatasFlatMapFunc).toDF(["card_id", "partition_date", "label", "label_count"])
itemStatisticDF.orderBy(['card_id', 'label', 'partition_date'])
itemStatisticDF.createOrReplaceTempView("itemStatisticDF")
# staticFeatures = samples.groupBy('item_id').agg(F.count(F.lit(1)).alias('itemRatingCount'),
# F.avg(F.col('rating')).alias('itemRatingAvg'),
# F.stddev(F.col('rating')).alias('itemRatingStddev'),
# F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).alias("itemClickCount"),
# F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).alias("itemExpCount")
# ).fillna(0) \
# .withColumn('itemRatingStddev', F.format_number(F.col('itemRatingStddev'), NUMBER_PRECISION).cast("float")) \
# .withColumn('itemRatingAvg', F.format_number(F.col('itemRatingAvg'), NUMBER_PRECISION).cast("float")) \
# .withColumn('itemCtr',F.format_number(ctrUdf(F.col("itemClickCount"),(F.col("itemExpCount"))), NUMBER_PRECISION).cast("float"))
itemStatisticSql = """
SELECT
card_id,
label,
partition_date,
label_count,
COALESCE(SUM(label_count) OVER(PARTITION BY card_id, label ORDER BY partition_date ROWS BETWEEN {itemStatisticStartDays} PRECEDING AND 1 PRECEDING), 0) label_count_sum,
COALESCE(AVG(label_count) OVER(PARTITION BY card_id, label ORDER BY partition_date ROWS BETWEEN {itemStatisticStartDays} PRECEDING AND 1 PRECEDING), 0) label_count_avg,
COALESCE(STDDEV_POP(label_count) OVER(PARTITION BY card_id, label ORDER BY partition_date ROWS BETWEEN {itemStatisticStartDays} PRECEDING AND 1 PRECEDING), 0) label_count_stddev
FROM
itemStatisticDF
""".format(itemStatisticStartDays = itemStatisticStartDays)
print(itemStatisticSql)
spark.sql(itemStatisticSql).show(100, False)
sys.exit(1)
clickDF = spark.sql(clickSql) clickDF = spark.sql(clickSql)
clickDF.createOrReplaceTempView("clickDF") clickDF.createOrReplaceTempView("clickDF")
clickDF.cache() clickDF.cache()
clickDF.show(100, False) print("click count: ", clickDF.count())
expDF = spark.sql(expSql) expDF = spark.sql(expSql)
expDF.createOrReplaceTempView("expDF") expDF.createOrReplaceTempView("expDF")
print("expDF before count: ", expDF.count()) expDF.cache()
# ratingDF = samplesNegAndUnion(clickDF,expDF) print("expDF 过滤点击数据前 count: ", expDF.count())
expDF = spark.sql(""" expDF = spark.sql("""
SELECT t1.partition_date, t1.device_id, t1.card_id, t1.time_stamp, t1.os, t1.user_city_id SELECT t1.partition_date, t1.device_id, t1.card_id, t1.time_stamp, t1.os, t1.user_city_id
FROM expDF t1 FROM expDF t1
...@@ -1030,10 +879,8 @@ if __name__ == '__main__': ...@@ -1030,10 +879,8 @@ if __name__ == '__main__':
AND t1.user_city_id = t2.user_city_id AND t1.user_city_id = t2.user_city_id
WHERE t2.device_id is NULL WHERE t2.device_id is NULL
""") """)
print("expDF after count: ", expDF.count()) print("expDF 过滤点击数据后 count: ", expDF.count())
print("click count: ", clickDF.count())
print("添加label...")
clickDF = clickDF.withColumn("label", F.lit(1)) clickDF = clickDF.withColumn("label", F.lit(1))
expDF = expDF.withColumn("label", F.lit(0)) expDF = expDF.withColumn("label", F.lit(0))
ratingDF = clickDF.union(expDF) ratingDF = clickDF.union(expDF)
...@@ -1045,57 +892,38 @@ if __name__ == '__main__': ...@@ -1045,57 +892,38 @@ if __name__ == '__main__':
.withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\ .withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\
.withColumn("timestamp",F.col("timestamp").cast("long")) .withColumn("timestamp",F.col("timestamp").cast("long"))
print(ratingDF.columns) ratingDF.cache()
print("ratingDF.columns: {}".format(ratingDF.columns))
print(ratingDF.show(100, truncate=False)) print(ratingDF.show(100, truncate=False))
expDF.unpersist(True)
clickDF.unpersist(True)
#item Es Feature
itemEsFeatureDF = get_service_feature_df()
print("itemEsFeatureDF.columns: {}".format(itemEsFeatureDF.columns))
print(itemEsFeatureDF.head(10))
#TODO 负样本为排除点击的数据 print("item es 特征工程")
# ratingSamplesWithLabel = addSampleLabel(ratingDF) item_es_feature_start_time = int(round(time.time()))
df = ratingDF.toPandas() itemEsFeatureDF = itemEsFeaturesProcess(itemEsFeatureDF)
df = pd.DataFrame(df) item_es_feature_end_time = int(round(time.time()))
print("item es 特征工程, 耗时: {}s".format(item_es_feature_end_time - item_es_feature_start_time))
# posCount = df.loc[df["label"]==1]["label"].count()
# negCount = df.loc[df["label"]==0]["label"].count()
# print("pos size:"+str(posCount),"neg size:"+str(negCount))
itemDF = get_service_feature_df()
print(itemDF.columns)
print(itemDF.head(10))
# itemDF.to_csv("/tmp/service_{}.csv".format(endDay))
# df.to_csv("/tmp/service_train_{}.csv".format(endDay))
# 数据字典
dataVocab = {}
multiVocab = {}
print("处理item特征...")
timestmp1 = int(round(time.time()))
itemDF = addItemFeatures(itemDF, dataVocab,multiVocab)
timestmp2 = int(round(time.time()))
print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
print("multiVocab:")
for k,v in multiVocab.items():
print(k,len(v))
print("dataVocab:")
for k, v in dataVocab.items():
print(k, len(v), v)
itemDF_spark = spark.createDataFrame(itemDF)
itemDF_spark.printSchema()
itemDF_spark.show(10, truncate=False)
itemEsFeatureDF = spark.createDataFrame(itemEsFeatureDF)
itemEsFeatureDF.printSchema()
itemEsFeatureDF.show(10, truncate=False)
clickStaticFeatures, expStaticFeatures = getItemStaticFeatures(itemStatisticStartDays + trainDays, startDay, endDay)
# item统计特征处理 # item统计特征处理
itemStaticDF = addItemStaticFeatures(ratingDF,itemDF_spark,dataVocab) samples_iEsF_iStatisticF = addItemFeatures(ratingDF, itemEsFeatureDF, clickStaticFeatures, expStaticFeatures)
# sys.exit(1) samples_iEsF_iStatisticF.show(50, truncate=False)
sys.exit(1)
# 统计数据处理 # 统计数据处理
# ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab) # ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
samples = ratingDF.join(itemStaticDF, on=['item_id'], how='inner') # samples = ratingDF.join(itemStaticDF, on=['item_id'], how='inner')
print("处理user特征...") print("处理user特征...")
samplesWithUserFeatures = addUserFeatures(samples,dataVocab,multiVocab) samplesWithUserFeatures = addUserFeatures(samples,dataVocab,multiVocab)
...@@ -1113,6 +941,10 @@ if __name__ == '__main__': ...@@ -1113,6 +941,10 @@ if __name__ == '__main__':
model_columns = user_columns + item_columns model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns) featureColumnsToRedis(model_columns)
#特征字典
dataVocab = {}
multiVocab = {}
print("数据字典save...") print("数据字典save...")
print("dataVocab:", str(dataVocab.keys())) print("dataVocab:", str(dataVocab.keys()))
vocab_path = "../vocab/{}_vocab.json".format(VERSION) vocab_path = "../vocab/{}_vocab.json".format(VERSION)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment