Commit 00a38405 authored by 宋柯's avatar 宋柯

模型上线

parent e350d105
...@@ -709,12 +709,7 @@ def getItemStatisticSql(start, end): ...@@ -709,12 +709,7 @@ def getItemStatisticSql(start, end):
on t1.cl_id = t2.device_id on t1.cl_id = t2.device_id
LEFT JOIN LEFT JOIN
( --去除黑名单 black_device_df t3
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id on t3.device_id=t2.device_id
WHERE t3.device_id is null WHERE t3.device_id is null
UNION UNION
...@@ -754,12 +749,7 @@ def getItemStatisticSql(start, end): ...@@ -754,12 +749,7 @@ def getItemStatisticSql(start, end):
) t2 ) t2
on t1.cl_id = t2.device_id on t1.cl_id = t2.device_id
LEFT JOIN LEFT JOIN
( --去除黑名单 black_device_df t3
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id on t3.device_id=t2.device_id
WHERE t3.device_id is null WHERE t3.device_id is null
) T ) T
...@@ -814,12 +804,7 @@ def getPredictItemStatisticSql(start, end): ...@@ -814,12 +804,7 @@ def getPredictItemStatisticSql(start, end):
) t2 ) t2
on t1.cl_id = t2.device_id on t1.cl_id = t2.device_id
LEFT JOIN LEFT JOIN
( --去除黑名单 black_device_df t3
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id on t3.device_id=t2.device_id
WHERE t3.device_id is null WHERE t3.device_id is null
UNION UNION
...@@ -859,12 +844,7 @@ def getPredictItemStatisticSql(start, end): ...@@ -859,12 +844,7 @@ def getPredictItemStatisticSql(start, end):
) t2 ) t2
on t1.cl_id = t2.device_id on t1.cl_id = t2.device_id
LEFT JOIN LEFT JOIN
( --去除黑名单 black_device_df t3
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id on t3.device_id=t2.device_id
WHERE t3.device_id is null WHERE t3.device_id is null
) T ) T
...@@ -1046,8 +1026,17 @@ def get_click_exp_start_end_time(trainDays): ...@@ -1046,8 +1026,17 @@ def get_click_exp_start_end_time(trainDays):
def get_and_save_card_feature(itemEsFeatureDF, predictClickStaticFeatures, predictExpStaticFeatures, fields_na_value_dict): def get_and_save_card_feature(itemEsFeatureDF, predictClickStaticFeatures, predictExpStaticFeatures, fields_na_value_dict):
itemFeature = itemEsFeatureDF.join(predictClickStaticFeatures, on = 'card_id', how = 'left').join(predictExpStaticFeatures, on = 'card_id', how = 'left') itemFeature = itemEsFeatureDF.join(predictClickStaticFeatures, on = 'card_id', how = 'left').join(predictExpStaticFeatures, on = 'card_id', how = 'left')
itemFeature = itemFeature.na.fill(fields_na_value_dict) fields = [field.name for field in itemFeature.schema.fields]
columns_used = list(filter(lambda c: c.startswith('ITEM_'), fields))
for k in fields_na_value_dict:
if k not in columns_used:
fields_na_value_dict.pop(k)
itemFeature.printSchema() itemFeature.printSchema()
print('columns_used:', columns_used)
print('fields_na_value_dict:', fields_na_value_dict)
itemFeature = itemFeature.na.fill(fields_na_value_dict)
itemFeatureDF = itemFeature.toPandas() itemFeatureDF = itemFeature.toPandas()
conn = getRedisConn() conn = getRedisConn()
BATCH = 5000 BATCH = 5000
...@@ -1056,8 +1045,6 @@ def get_and_save_card_feature(itemEsFeatureDF, predictClickStaticFeatures, predi ...@@ -1056,8 +1045,6 @@ def get_and_save_card_feature(itemEsFeatureDF, predictClickStaticFeatures, predi
row = row[1] row = row[1]
return '|'.join(row[columns_used].astype(str)) return '|'.join(row[columns_used].astype(str))
columns_used = list(filter(lambda c: c.startswith('ITEM_'), list(itemFeatureDF.columns)))
print('columns_used: ', columns_used)
conn.delete(Key_TMP) conn.delete(Key_TMP)
for start in range(0, len(itemFeatureDF), BATCH): for start in range(0, len(itemFeatureDF), BATCH):
conn.hmset(Key_TMP, {row[1]['card_id']: concat_service_feature(row) for row in itemFeatureDF.iloc[start: start + BATCH].iterrows()}) conn.hmset(Key_TMP, {row[1]['card_id']: concat_service_feature(row) for row in itemFeatureDF.iloc[start: start + BATCH].iterrows()})
...@@ -1113,7 +1100,7 @@ def get_and_save_device_feature(spark, fields_na_value_dict, days = 180): ...@@ -1113,7 +1100,7 @@ def get_and_save_device_feature(spark, fields_na_value_dict, days = 180):
print(sql) print(sql)
device_feature_df = spark.sql(sql) device_feature_df = spark.sql(sql)
device_feature_df = device_feature_df.na.fill(fields_na_value_dict) device_feature_df = device_feature_df.na.fill({'os': '-1'})
device_feature_df.printSchema() device_feature_df.printSchema()
device_feature_df = device_feature_df.toPandas() device_feature_df = device_feature_df.toPandas()
conn = getRedisConn() conn = getRedisConn()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment