Commit 57d4f6ea authored by 郭羽's avatar 郭羽

特征工程优化

parent 633be5ab
...@@ -63,10 +63,11 @@ FEATURE_VOCAB_KEY = "Strategy:rec:vocab:service:" + VERSION ...@@ -63,10 +63,11 @@ FEATURE_VOCAB_KEY = "Strategy:rec:vocab:service:" + VERSION
FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION
TRAIN_FILE_PATH = "service_feature_" + VERSION TRAIN_FILE_PATH = "service_feature_" + VERSION
ITEM_PREFIX = "item_"
def addItemFeatures(samples,itemDF): def addItemFeatures(samples,itemDF):
prefix = "item_"
itemDF = itemDF.withColumnRenamed("id", "itemid") itemDF = itemDF.withColumnRenamed("id", "itemid")
# 数据过滤:无医生 # 数据过滤:无医生
itemDF = itemDF.filter(col("doctor_id") != "-1") itemDF = itemDF.filter(col("doctor_id") != "-1")
...@@ -74,18 +75,18 @@ def addItemFeatures(samples,itemDF): ...@@ -74,18 +75,18 @@ def addItemFeatures(samples,itemDF):
# null处理 # null处理
for c in ITEM_NUMBER_COLUMNS: for c in ITEM_NUMBER_COLUMNS:
print("null count:",c,itemDF.filter(col(c).isNull()).count()) print("null count:",c,itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(prefix+c,when(col(c).isNull(),0).otherwise(col(c)).cast("float")).drop(c) itemDF = itemDF.withColumn(ITEM_PREFIX+c,when(col(c).isNull(),0).otherwise(col(c)).cast("float")).drop(c)
for c in ITEM_CATE_COLUMNS: for c in ITEM_CATE_COLUMNS:
print("null count:", c, itemDF.filter(col(c).isNull()).count()) print("null count:", c, itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(prefix+c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))).drop(c) itemDF = itemDF.withColumn(ITEM_PREFIX+c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))).drop(c)
# 离散特征处理 # 离散特征处理
for c, v in ITEM_MULTI_COLUMN_EXTRA_MAP.items(): for c, v in ITEM_MULTI_COLUMN_EXTRA_MAP.items():
print("null count:", c, itemDF.filter(col(c).isNull()).count()) print("null count:", c, itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))) itemDF = itemDF.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c)))
for i in range(1, v + 1): for i in range(1, v + 1):
new_c = prefix + c + "__" + str(i) new_c = ITEM_PREFIX + c + "__" + str(i)
itemDF = itemDF.withColumn(new_c, F.split(F.col(c), ",")[i - 1]) itemDF = itemDF.withColumn(new_c, F.split(F.col(c), ",")[i - 1])
itemDF = itemDF.withColumn(new_c, F.when(F.col(new_c).isNull(), "-1").otherwise(F.col(new_c))) itemDF = itemDF.withColumn(new_c, F.when(F.col(new_c).isNull(), "-1").otherwise(F.col(new_c)))
...@@ -109,7 +110,7 @@ def addItemFeatures(samples,itemDF): ...@@ -109,7 +110,7 @@ def addItemFeatures(samples,itemDF):
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c+"Scale")) # pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c+"Scale"))
# bucketing # bucketing
bucketColumns = [prefix+"case_count", prefix+"ordered_user_ids_count", prefix+"lowest_price", "itemRatingCount", "itemRatingStddev","itemRatingAvg"] bucketColumns = [ITEM_PREFIX+"case_count", ITEM_PREFIX+"ordered_user_ids_count", ITEM_PREFIX+"lowest_price", "itemRatingCount", "itemRatingStddev","itemRatingAvg"]
for c in bucketColumns: for c in bucketColumns:
pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket")) pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket"))
...@@ -193,7 +194,7 @@ def addUserFeatures(samples): ...@@ -193,7 +194,7 @@ def addUserFeatures(samples):
def addSampleLabel(ratingSamples): def addSampleLabel(ratingSamples):
ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 1, 1).otherwise(0)) ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 8, 1).otherwise(0))
ratingSamples.show(5, truncate=False) ratingSamples.show(5, truncate=False)
ratingSamples.printSchema() ratingSamples.printSchema()
return ratingSamples return ratingSamples
...@@ -224,7 +225,7 @@ def splitAndSaveTrainingTestSamplesByTimeStamp(samples,splitTimestamp, file_path ...@@ -224,7 +225,7 @@ def splitAndSaveTrainingTestSamplesByTimeStamp(samples,splitTimestamp, file_path
test.write.option("header", "true").option("delimiter", "|").mode('overwrite').csv(testSavePath) test.write.option("header", "true").option("delimiter", "|").mode('overwrite').csv(testSavePath)
def getDataVocab(samples): def getDataVocab(samples,model_columns):
dataVocab = {} dataVocab = {}
multiVocab = {} multiVocab = {}
...@@ -241,8 +242,8 @@ def getDataVocab(samples): ...@@ -241,8 +242,8 @@ def getDataVocab(samples):
multiVocab[c] = list(tagSet) multiVocab[c] = list(tagSet)
samples = samples.drop(c) samples = samples.drop(c)
# id类特征 # id类特征 和 类别特征
for c in ["userid","itemid"]: for c in ["userid","itemid"] + [ITEM_PREFIX + c for c in ITEM_CATE_COLUMNS]:
datas = samples.select(c).distinct().collect() datas = samples.select(c).distinct().collect()
vocabSet = set() vocabSet = set()
for d in datas: for d in datas:
...@@ -252,9 +253,9 @@ def getDataVocab(samples): ...@@ -252,9 +253,9 @@ def getDataVocab(samples):
dataVocab[c] = list(vocabSet) dataVocab[c] = list(vocabSet)
pass pass
for c in samples.columns: for c in model_columns:
print("col",c) print("col",c)
# 判断是否以Bucket结尾 和 类别特征 # 判断是否以Bucket结尾
if c.endswith("Bucket"): if c.endswith("Bucket"):
datas = samples.select(c).distinct().collect() datas = samples.select(c).distinct().collect()
vocabSet = set() vocabSet = set()
...@@ -350,8 +351,103 @@ def getEsConn_test(): ...@@ -350,8 +351,103 @@ def getEsConn_test():
def getEsConn(): def getEsConn():
return Elasticsearch(SERVICE_HOSTS, http_auth=('elastic', 'gengmei!@#'), timeout=3600) return Elasticsearch(SERVICE_HOSTS, http_auth=('elastic', 'gengmei!@#'), timeout=3600)
def getClickData(spark, start, end): def getClickSql(start, end):
positiveSql = """ sql = """
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.card_id,t1.time_stamp,t1.page_stay
FROM
(
select partition_date,cl_id,business_id as card_id,time_stamp,page_stay
from online.bl_hdfs_maidian_updates
where action = 'page_view'
AND partition_date>='{startDay}' and partition_date<'{endDay}'
AND page_name='welfare_detail'
-- AND page_stay>=1
AND cl_id is not null
AND cl_id != ''
AND business_id is not null
AND business_id != ''
group by partition_date,cl_id,business_id,time_stamp,page_stay
) AS t1
join
( --渠道,新老
SELECT distinct device_id
FROM online.ml_device_day_active_status
where partition_date>='{startDay}' and partition_date<'{endDay}'
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion\_jf\_%'
) t2
on t1.cl_id = t2.device_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
""".format(startDay=start,endDay=end)
print(sql)
return sql
def getExposureSql(start, end):
sql = """
SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay
from
( --新首页卡片曝光
SELECT partition_date,cl_id,card_id,time_stamp
FROM online.ml_community_precise_exposure_detail
where partition_date>='{startDay}' and partition_date<'{endDay}'
and action in ('page_precise_exposure','home_choiceness_card_exposure')
and cl_id IS NOT NULL
and card_id IS NOT NULL
and is_exposure='1'
and page_name='home'
and tab_name='精选'
and card_type in ('card','video')
and card_content_type in ('service')
group by partition_date,cl_id,card_id,time_stamp
) t1
join
( --渠道,新老
SELECT distinct device_id
FROM online.ml_device_day_active_status
where partition_date>='{startDay}' and partition_date<'{endDay}'
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion\_jf\_%'
) t2
on t1.cl_id = t2.device_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
""".format(startDay=start,endDay=end)
print(sql)
return sql
def getClickSql2(start, end):
sql = """
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.business_id card_id,t1.time_stamp time_stamp,t1.page_stay as page_stay SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.business_id card_id,t1.time_stamp time_stamp,t1.page_stay as page_stay
FROM FROM
(select partition_date,cl_id,business_id,action,page_name,page_stay,time_stamp,page_stay (select partition_date,cl_id,business_id,action,page_name,page_stay,time_stamp,page_stay
...@@ -391,16 +487,14 @@ def getClickData(spark, start, end): ...@@ -391,16 +487,14 @@ def getClickData(spark, start, end):
on t1.cl_id=dev.device_id on t1.cl_id=dev.device_id
WHERE dev.device_id is null WHERE dev.device_id is null
""".format(start, end, ACTION_REG, start, end) """.format(start, end, ACTION_REG, start, end)
print(positiveSql) print(sql)
return sql
return spark.sql(positiveSql)
def getExposureSql2(start, end):
def getExposureData(spark, start, end): sql = """
negSql = """
SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay
FROM FROM
(SELECT * (SELECT partition_date,cl_id,card_id,time_stamp
FROM online.ml_community_precise_exposure_detail FROM online.ml_community_precise_exposure_detail
WHERE cl_id IS NOT NULL WHERE cl_id IS NOT NULL
AND card_id IS NOT NULL AND card_id IS NOT NULL
...@@ -479,8 +573,8 @@ def getExposureData(spark, start, end): ...@@ -479,8 +573,8 @@ def getExposureData(spark, start, end):
AND t2.first_channel_source_type NOT LIKE 'promotion\\_jf\\_%' AND t2.first_channel_source_type NOT LIKE 'promotion\\_jf\\_%'
""".format(ACTION_REG, CONTENT_TYPE, start, end) """.format(ACTION_REG, CONTENT_TYPE, start, end)
print(negSql) print(sql)
return spark.sql(negSql) return sql
def connectDoris(spark, table): def connectDoris(spark, table):
return spark.read \ return spark.read \
...@@ -626,26 +720,24 @@ if __name__ == '__main__': ...@@ -626,26 +720,24 @@ if __name__ == '__main__':
#入参 #入参
trainDays = int(sys.argv[1]) trainDays = int(sys.argv[1])
print('trainDays:{}'.format(trainDays),flush=True) print('trainDays:{}'.format(trainDays),flush=True)
spark = get_spark("service_feature_csv_export")
spark.sparkContext.setLogLevel("ERROR")
endDay = addDays(-1) endDay = addDays(-1)
startDay = addDays(-(1 + int(trainDays))) startDay = addDays(-(1 + int(trainDays)))
print("train_data start:{} end:{}".format(startDay,endDay)) print("train_data start:{} end:{}".format(startDay,endDay))
spark = get_spark("service_feature_csv_export")
spark.sparkContext.setLogLevel("ERROR")
itemDF = get_service_feature_df(spark) itemDF = get_service_feature_df(spark)
# 行为数据 # 行为数据
clickDF = getClickData(spark,startDay,endDay) clickSql = getClickSql(startDay,endDay)
exposureDF = getExposureData(spark,startDay,endDay) print("--------")
ratingDF = samplesNegAndUnion(clickDF,exposureDF) expSql = getExposureSql(startDay,endDay)
# conf = SparkConf().setAppName('featureEngineering').setMaster('local') clickDF = spark.sql(clickSql)
# spark = SparkSession.builder.config(conf=conf).getOrCreate() expDF = spark.sql(expSql)
# spark.sparkContext.setLogLevel("ERROR") # ratingDF = samplesNegAndUnion(clickDF,expDF)
# itemDF = spark.read.format('csv').option('header', 'true').option('sep', '|').load("service_item.csv") ratingDF = clickDF.union(expDF)
# ratingDF = spark.read.format('csv').option('header', 'true').option('sep', '|').load("service_rating.csv")
# ratingDF = ratingDF.withColumn("rating",F.when(col("label")>=1,1).otherwise(0))
ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\ ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\
.withColumnRenamed("device_id", "userid")\ .withColumnRenamed("device_id", "userid")\
...@@ -717,7 +809,7 @@ if __name__ == '__main__': ...@@ -717,7 +809,7 @@ if __name__ == '__main__':
# 离散数据字典生成 # 离散数据字典生成
print("数据字典生成...") print("数据字典生成...")
dataVocab = getDataVocab(samplesWithUserFeatures) dataVocab = getDataVocab(samplesWithUserFeatures,model_columns)
timestmp8 = int(round(time.time())) timestmp8 = int(round(time.time()))
print("数据字典生成 耗时s:{}".format(timestmp8 - timestmp7)) print("数据字典生成 耗时s:{}".format(timestmp8 - timestmp7))
# 字典转为json 存入redis # 字典转为json 存入redis
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment