Commit db3c60f5 authored by 郭羽's avatar 郭羽

service model 优化

parent f74cd085
...@@ -335,8 +335,7 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey): ...@@ -335,8 +335,7 @@ def itemFeaturesToRedis(samples,itemDF,columns,redisKey):
conn.set(newKey, v) conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7) conn.expire(newKey, 60 * 60 * 24 * 7)
item_static_columns = [idCol] + [col for col in columns if col.endswith("Bucket") or col.endswith("_number")] item_static_columns = [idCol] + ["itemRatingCountBucket", "itemRatingAvgBucket", "itemClickCountBucket", "itemExpCountBucket","itemRatingStddev_number","itemCtr_number"]
#根据timestamp获取每个user最新的记录 #根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol)) prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
item_static_df = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol)) item_static_df = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
......
...@@ -84,43 +84,25 @@ def getDataSet(df,shuffleSize = 10000,batchSize=128): ...@@ -84,43 +84,25 @@ def getDataSet(df,shuffleSize = 10000,batchSize=128):
def getTrainColumns(train_columns,data_vocab): def getTrainColumns(train_columns,data_vocab):
emb_columns = [] emb_columns = []
number_columns = [] number_columns = []
oneHot_columns = []
dataColumns = []
inputs = {} inputs = {}
# 离散特征 # 离散特征
for feature in train_columns: for feature in train_columns:
if data_vocab.get(feature): if data_vocab.get(feature):
if feature.count("__")>0:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature])
col = tf.feature_column.embedding_column(cat_col, 5)
emb_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
elif feature in one_hot_columns or feature.count("Bucket") > 0:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature, vocabulary_list=data_vocab[feature])
# col = tf.feature_column.indicator_column(cat_col)
col = tf.feature_column.embedding_column(cat_col, 3)
oneHot_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
else:
cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature]) cat_col = tf.feature_column.categorical_column_with_vocabulary_list(key=feature,vocabulary_list=data_vocab[feature])
col = tf.feature_column.embedding_column(cat_col, 10) col = tf.feature_column.embedding_column(cat_col, 10)
emb_columns.append(col) emb_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string') inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='string')
elif feature.endswith("_number"): elif feature.endswith("_number"):
col = tf.feature_column.numeric_column(feature) col = tf.feature_column.numeric_column(feature)
number_columns.append(col) number_columns.append(col)
dataColumns.append(feature)
inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='float32') inputs[feature] = tf.keras.layers.Input(name=feature, shape=(), dtype='float32')
return emb_columns,number_columns,oneHot_columns,dataColumns,inputs return emb_columns,number_columns,inputs
def train(emb_columns, number_columns, oneHot_columns, inputs, train_dataset): def train(emb_columns, number_columns, inputs, train_dataset):
wide = tf.keras.layers.DenseFeatures(emb_columns + number_columns + oneHot_columns)(inputs) wide = tf.keras.layers.DenseFeatures(emb_columns + number_columns)(inputs)
deep = tf.keras.layers.Dense(64, activation='relu')(wide) deep = tf.keras.layers.Dense(64, activation='relu')(wide)
deep = tf.keras.layers.Dropout(0.2)(deep) deep = tf.keras.layers.Dropout(0.2)(deep)
concat_layer = tf.keras.layers.concatenate([wide, deep], axis=1) concat_layer = tf.keras.layers.concatenate([wide, deep], axis=1)
...@@ -204,8 +186,9 @@ if __name__ == '__main__': ...@@ -204,8 +186,9 @@ if __name__ == '__main__':
columns = df_train.columns.tolist() columns = df_train.columns.tolist()
print("原始数据列:") print("原始数据列:")
print(columns) print(columns)
emb_columns,number_columns,oneHot_columns, datasColumns,inputs = getTrainColumns(columns, data_vocab) emb_columns,number_columns,inputs = getTrainColumns(columns, data_vocab)
print("训练列:") print("训练列:")
datasColumns = list(inputs.keys())
print(datasColumns) print(datasColumns)
df_train = df_train[datasColumns + ["label"]] df_train = df_train[datasColumns + ["label"]]
...@@ -226,7 +209,7 @@ if __name__ == '__main__': ...@@ -226,7 +209,7 @@ if __name__ == '__main__':
print("train start...") print("train start...")
timestmp3 = int(round(time.time())) timestmp3 = int(round(time.time()))
model = train(emb_columns,number_columns,oneHot_columns,inputs,train_data) model = train(emb_columns,number_columns,inputs,train_data)
timestmp4 = int(round(time.time())) timestmp4 = int(round(time.time()))
print("train end...耗时h:{}".format((timestmp4 - timestmp3)/60/60)) print("train end...耗时h:{}".format((timestmp4 - timestmp3)/60/60))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment