import os import argparse import shutil import pandas as pd from sklearn.model_selection import train_test_split import tensorflow as tf def build_model_columns(): # 定义连续值列 actual_price = tf.feature_column.numeric_column('actual_price', normalizer_fn=lambda x: (x - 0) / 150000, dtype=tf.float32) # 定义离散值列 gender = tf.feature_column.categorical_column_with_vocabulary_list( 'Gender', [1, -1, 0], dtype=tf.int64) # 对购买总金额和最大一次购买inx进行分箱 actual_price_bin = tf.feature_column.bucketized_column( actual_price, boundaries=[100, 250, 550, 1300]) # wide部分的特征是0 1稀疏向量, 走LR, 采用全部离散特征和某些离散特征的交叉 wide_columns = [actual_price_bin, gender] gender_emb = tf.feature_column.embedding_column(gender, 10) # 所有特征都走deep部分, 连续特征+离散特征onehot或者embedding deep_columns = [ gender_emb ] return wide_columns, deep_columns def build_estimator(model_dir, model_type, warm_start_from=None): """按照指定的模型生成估算器对象.""" # 特征工程后的列对象组成的list wide_columns, deep_columns = build_model_columns() # deep 每一层全连接隐藏层单元个数, 4层每一层的激活函数是relu hidden_units = [50, 25] run_config = tf.estimator.RunConfig().replace( # 将GPU个数设为0,关闭GPU运算。因为该模型在CPU上速度更快 save_checkpoints_steps=100, keep_checkpoint_max=2) if model_type == 'wide': # 生成带有wide模型的估算器对象 return tf.estimator.LinearClassifier( model_dir=model_dir, feature_columns=wide_columns, config=run_config) elif model_type == 'deep': # 生成带有deep模型的估算器对象 return tf.estimator.DNNClassifier( model_dir=model_dir, feature_columns=deep_columns, hidden_units=hidden_units, config=run_config) else: return tf.estimator.DNNLinearCombinedClassifier( # 生成带有wide和deep模型的估算器对象 model_dir=model_dir, linear_feature_columns=wide_columns, dnn_feature_columns=deep_columns, dnn_hidden_units=hidden_units, config=run_config, warm_start_from=warm_start_from) def read_pandas(data_file): """pandas将数据读取内存""" assert os.path.exists(data_file), ("%s not found." % data_file) df = pd.read_csv(data_file).dropna() train, test = train_test_split(df, test_size=0.15, random_state=1) y_train = train.pop("label") y_test = test.pop("label") return train, test, y_train, y_test def input_fn(X, y, shuffle, batch_size, predict=False): # 定义估算器输入函数 """估算器的输入函数.""" if predict == True: # from_tensor_slices 从内存引入数据 dataset = tf.data.Dataset.from_tensor_slices(X.to_dict(orient='list')) # 创建dataset数据集 else: dataset = tf.data.Dataset.from_tensor_slices((X.to_dict(orient='list'), y)) # 创建dataset数据集 if shuffle: # 对数据进行乱序操作 dataset = dataset.shuffle(buffer_size=64) # 越大shuffle程度越大 dataset = dataset.batch(batch_size) # 将数据集按照batch_size划分 dataset = dataset.prefetch(1) # 预取数据,buffer_size=1 在多数情况下就足够了 return dataset def trainmain(train, y_train, test, y_test): model_dir = "./wide_deep_test" model_type = "wide_deep" model = build_estimator(model_dir, model_type) # 生成估算器对象 def train_input_fn(): return input_fn(train, y_train, True, 1, predict=False) def eval_input_fn(): return input_fn(test, y_test, False, 1, predict=False) # 在外部指定repeat 不在dataset中 for n in range(1): model.train(input_fn=train_input_fn) results = model.evaluate(input_fn=eval_input_fn) print('{0:-^30}'.format('evaluate at epoch %d' % ((n + 1)))) # results 是一个字典 print(pd.Series(results).to_frame('values')) # 导出模型 export_model(model, "saved_model_test") def export_model(model, export_dir): features = { "Gender": tf.placeholder(dtype=tf.int64, shape=(2), name='Gender'), "actual_price": tf.placeholder(dtype=tf.float32, shape=(2), name='actual_price'), } example_input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(features) model.export_savedmodel() model.export_savedmodel(export_dir, example_input_fn, as_text=False, strip_default_attrs=True) import pandas as pd train_X = pd.DataFrame({"Gender": [1, 0, 1, 0, 1, 0], "actual_price": [10000.0, 10000.0, 10000.0, 10000.0, 10000.0, 10000.0]}) train_Y = [1, 0, 1, 0, 1, 0] trainmain(train_X, train_Y, train_X, train_Y)