Commit d468d022 authored by 郭羽's avatar 郭羽

美购精排模型

parent 7fc7ec68
...@@ -75,25 +75,39 @@ def getTrainColumns(train_columns,data_vocab): ...@@ -75,25 +75,39 @@ def getTrainColumns(train_columns,data_vocab):
inputs = {} inputs = {}
# 离散特征 # 离散特征
for feature in train_columns: for feature in train_columns:
# if data_vocab.get(feature): if data_vocab.get(feature):
# cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature]) if feature.count("__")>0 or feature.count("Bucket") > 0:
# col = tf.feature_column.embedding_column(cat_col, 10) cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature])
# columns.append(col) col = tf.feature_column.embedding_column(cat_col, 5)
# dataColumns.append(feature) columns.append(col)
# inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') dataColumns.append(feature)
if feature.startswith("userRatedHistory") or feature.count("__") > 0 or feature in embedding_columns: inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature]) elif feature in one_hot_columns:
col = tf.feature_column.embedding_column(cat_col, 10) cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
columns.append(col) col = tf.feature_column.indicator_column(cat_col)
dataColumns.append(feature) columns.append(col)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
elif feature in one_hot_columns or feature.count("Bucket") > 0: else:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature]) cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature])
col = tf.feature_column.indicator_column(cat_col) col = tf.feature_column.embedding_column(cat_col, 10)
columns.append(col) columns.append(col)
dataColumns.append(feature) dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
# if feature.startswith("userRatedHistory") or feature.count("__") > 0 or feature in embedding_columns:
# cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
# # col = tf.feature_column.embedding_column(cat_col, 10)
# col = tf.feature_column.indicator_column(cat_col)
# columns.append(col)
# dataColumns.append(feature)
# inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
# elif feature in one_hot_columns or feature.count("Bucket") > 0:
# cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
# col = tf.feature_column.indicator_column(cat_col)
# columns.append(col)
# dataColumns.append(feature)
# inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
elif feature in ITEM_NUMBER_COLUMNS: elif feature in ITEM_NUMBER_COLUMNS:
col = tf.feature_column.numeric_column(feature) col = tf.feature_column.numeric_column(feature)
...@@ -108,7 +122,9 @@ def getTrainColumns(train_columns,data_vocab): ...@@ -108,7 +122,9 @@ def getTrainColumns(train_columns,data_vocab):
def train(columns,inputs,train_dataset): def train(columns,inputs,train_dataset):
deep = tf.keras.layers.DenseFeatures(columns)(inputs) deep = tf.keras.layers.DenseFeatures(columns)(inputs)
deep = tf.keras.layers.Dense(64, activation='relu')(deep) deep = tf.keras.layers.Dense(64, activation='relu')(deep)
deep = tf.keras.layers.Dropout(0.2)(deep)
deep = tf.keras.layers.Dense(64, activation='relu')(deep) deep = tf.keras.layers.Dense(64, activation='relu')(deep)
deep = tf.keras.layers.Dropout(0.2)(deep)
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(deep) output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(deep)
model = tf.keras.Model(inputs, output_layer) model = tf.keras.Model(inputs, output_layer)
...@@ -119,13 +135,13 @@ def train(columns,inputs,train_dataset): ...@@ -119,13 +135,13 @@ def train(columns,inputs,train_dataset):
metrics=['accuracy', tf.keras.metrics.AUC(curve='ROC'), tf.keras.metrics.AUC(curve='PR')]) metrics=['accuracy', tf.keras.metrics.AUC(curve='ROC'), tf.keras.metrics.AUC(curve='PR')])
# train the model # train the model
print("train start...")
model.fit(train_dataset, epochs=5) model.fit(train_dataset, epochs=5)
print("train end...")
print("train save...") print("train save...")
model.save(model_file, include_optimizer=False, save_format='tf') model.save(model_file, include_optimizer=False, save_format='tf')
return model
def evaluate(model,test_dataset): def evaluate(model,test_dataset):
# evaluate the model # evaluate the model
...@@ -185,10 +201,11 @@ if __name__ == '__main__': ...@@ -185,10 +201,11 @@ if __name__ == '__main__':
test_data = getDataSet(df_test,shuffleSize=testSize) test_data = getDataSet(df_test,shuffleSize=testSize)
print("train start...")
timestmp3 = int(round(time.time())) timestmp3 = int(round(time.time()))
model = train(trainColumns,inputs,train_data) model = train(trainColumns,inputs,train_data)
timestmp4 = int(round(time.time())) timestmp4 = int(round(time.time()))
print("读取数据耗时h:{}".format((timestmp4 - timestmp3)/60/60)) print("train end...耗时h:{}".format((timestmp4 - timestmp3)/60/60))
evaluate(model,test_data) evaluate(model,test_data)
......
...@@ -32,13 +32,22 @@ import utils.connUtils as connUtils ...@@ -32,13 +32,22 @@ import utils.connUtils as connUtils
特征工程 特征工程
""" """
ITEM_MULTI_COLUMN_EXTRA_MAP = {"first_demands": 10, ITEM_MULTI_COLUMN_EXTRA_MAP = {"first_demands": 1,
"second_demands": 30, "second_demands": 5,
"first_solutions": 2, "first_solutions": 1,
"second_solutions": 14, "second_solutions": 5,
"first_positions": 1, "first_positions": 1,
"second_positions": 20, "second_positions": 5,
"tags_v3": 30, "tags_v3": 10,
}
USER_MULTI_COLUMN_EXTRA_MAP = {"first_demands": 1,
"second_demands": 3,
"first_solutions": 1,
"second_solutions": 3,
"first_positions": 1,
"second_positions": 3,
"tags_v3": 5,
} }
ITEM_NUMBER_COLUMNS = ["lowest_price","smart_rank2","case_count","ordered_user_ids_count"] ITEM_NUMBER_COLUMNS = ["lowest_price","smart_rank2","case_count","ordered_user_ids_count"]
...@@ -125,23 +134,21 @@ def addUserFeatures(samples): ...@@ -125,23 +134,21 @@ def addUserFeatures(samples):
.filter(F.col("userRatingCount") > 1) .filter(F.col("userRatingCount") > 1)
# user偏好 # user偏好
for c in ["first_demands","second_demands","first_solutions","second_solutions","first_positions","second_positions"]: for c,v in USER_MULTI_COLUMN_EXTRA_MAP.items():
new_col = "user" + "__"+c new_col = "user" + "__" + c
samples = samples\ samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)))) \ for i in range(1, v+1):
.withColumn(new_col+"__1", F.when(F.col(new_col)[0].isNotNull(),F.col(new_col)[0]).otherwise("-1")) \ samples = samples.withColumn(new_col + "__" + str(i),F.when(F.col(new_col)[i - 1].isNotNull(), F.col(new_col)[i - 1]).otherwise("-1"))
.withColumn(new_col+"__2", F.when(F.col(new_col)[1].isNotNull(),F.col(new_col)[1]).otherwise("-1")) \ samples = samples.drop(new_col)
.withColumn(new_col+"__3", F.when(F.col(new_col)[2].isNotNull(),F.col(new_col)[2]).otherwise("-1")) \
.drop(new_col)
# .drop(c).drop(new_col) # .drop(c).drop(new_col)
# tags # tags
c = "tags_v3" # c = "tags_v3"
new_col = "user" + "__" + c # new_col = "user" + "__" + c
samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)))) # samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
for i in range(1,10): # for i in range(1,10):
samples = samples.withColumn(new_col+"__"+str(i), F.when(F.col(new_col)[i-1].isNotNull(),F.col(new_col)[i-1]).otherwise("-1")) # samples = samples.withColumn(new_col+"__"+str(i), F.when(F.col(new_col)[i-1].isNotNull(),F.col(new_col)[i-1]).otherwise("-1"))
samples = samples.drop(new_col) # samples = samples.drop(new_col)
pipelineStage = [] pipelineStage = []
...@@ -151,7 +158,7 @@ def addUserFeatures(samples): ...@@ -151,7 +158,7 @@ def addUserFeatures(samples):
# bucketing # bucketing
for c in ["userRatingCount","userRatingAvg","userRatingStddev"]: for c in ["userRatingCount","userRatingAvg","userRatingStddev"]:
pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket")) pipelineStage.append(QuantileDiscretizer(numBuckets=20, inputCol=c, outputCol=c + "Bucket"))
featurePipeline = Pipeline(stages=pipelineStage) featurePipeline = Pipeline(stages=pipelineStage)
samples = featurePipeline.fit(samples).transform(samples) samples = featurePipeline.fit(samples).transform(samples)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment