Commit c713565e authored by 赵威's avatar 赵威

try predict tractate

parent d9f5cd31
import timeit
import pandas as pd import pandas as pd
from utils.cache import redis_db_client from utils.cache import redis_db_client
...@@ -135,4 +137,94 @@ def join_features(device_df, tractate_df, cc_df): ...@@ -135,4 +137,94 @@ def join_features(device_df, tractate_df, cc_df):
def device_tractate_fe(device_id, tractate_ids, device_dict, tractate_dict): def device_tractate_fe(device_id, tractate_ids, device_dict, tractate_dict):
pass time_1 = timeit.default_timer()
device_info = device_dict.get(device_id, {}).copy()
if not device_info:
device_info = {
"device_id": device_id,
"active_type": "1",
"active_days": "0",
"channel_first": "App Store",
"city_first": "beijing",
"model_first": "iPhone10",
"past_consume_ability_history": "极弱",
"potential_consume_ability_history": "极弱",
"price_sensitive_history": "不敏感无消费",
"device_click_num_1d": 0,
"device_click_num_3d": 0,
"device_click_num_7d": 0,
"device_click_num_15d": 0,
"device_click_num_30d": 0,
"device_click_num_180d": 0,
"click_tractate_id1": "-1",
"click_tractate_id2": "-1",
"click_tractate_id3": "-1",
"click_tractate_id4": "-1",
"click_tractate_id5": "-1"
}
device_fd = device_info.get("first_demands", [])
device_sd = device_info.get("second_demands", [])
device_fs = device_info.get("first_solutions", [])
device_ss = device_info.get("second_solutions", [])
device_fp = device_info.get("first_positions", [])
device_sp = device_info.get("second_positions", [])
device_p = device_info.get("projects", [])
device_info["device_fd"] = nth_element(device_fd, 0)
device_info["device_sd"] = nth_element(device_sd, 0)
device_info["device_fs"] = nth_element(device_fs, 0)
device_info["device_ss"] = nth_element(device_ss, 0)
device_info["device_fp"] = nth_element(device_fp, 0)
device_info["device_sp"] = nth_element(device_sp, 0)
device_info["device_p"] = nth_element(device_p, 0)
tractate_lst = []
tractate_ids_res = []
for id in tractate_ids:
tractate_info = tractate_dict.get(id, {}).copy()
if tractate_info:
tractate_ids_res.append(tractate_info.get("card_id", "-1"))
tractate_fd = tractate_info.get("first_demands", [])
tractate_sd = tractate_info.get("second_demands", [])
tractate_fs = tractate_info.get("first_solutions", [])
tractate_ss = tractate_info.get("second_solutions", [])
tractate_fp = tractate_info.get("first_positions", [])
tractate_sp = tractate_info.get("second_positions", [])
tractate_p = tractate_info.get("projects", [])
common_fd = common_elements(device_fd, tractate_fd)
common_sd = common_elements(device_sd, tractate_sd)
common_fs = common_elements(device_fs, tractate_fs)
common_ss = common_elements(device_ss, tractate_ss)
common_fp = common_elements(device_fp, tractate_fp)
common_sp = common_elements(device_sp, tractate_sp)
common_p = common_elements(device_p, tractate_p)
tractate_info["content_fd"] = nth_element(tractate_fd, 0)
tractate_info["content_sd"] = nth_element(tractate_sd, 0)
tractate_info["content_fs"] = nth_element(tractate_fs, 0)
tractate_info["content_ss"] = nth_element(tractate_ss, 0)
tractate_info["content_fp"] = nth_element(tractate_fp, 0)
tractate_info["content_sp"] = nth_element(tractate_sp, 0)
tractate_info["content_p"] = nth_element(tractate_p, 0)
tractate_info["fd1"] = nth_element(common_fd, 0)
tractate_info["fd2"] = nth_element(common_fd, 1)
tractate_info["fd3"] = nth_element(common_fd, 2)
tractate_info["sd1"] = nth_element(common_sd, 0)
tractate_info["sd2"] = nth_element(common_sd, 1)
tractate_info["sd3"] = nth_element(common_sd, 2)
tractate_info["fs1"] = nth_element(common_fs, 0)
tractate_info["fs2"] = nth_element(common_fs, 1)
tractate_info["fs3"] = nth_element(common_fs, 2)
tractate_info["ss1"] = nth_element(common_ss, 0)
tractate_info["ss2"] = nth_element(common_ss, 1)
tractate_info["ss3"] = nth_element(common_ss, 2)
tractate_info["fp1"] = nth_element(common_fp, 0)
tractate_info["fp2"] = nth_element(common_fp, 1)
tractate_info["fp3"] = nth_element(common_fp, 2)
tractate_info["sp1"] = nth_element(common_sp, 0)
tractate_info["sp2"] = nth_element(common_sp, 1)
tractate_info["sp3"] = nth_element(common_sp, 2)
tractate_info["p1"] = nth_element(common_p, 0)
tractate_info["p2"] = nth_element(common_p, 1)
tractate_info["p3"] = nth_element(common_p, 2)
tractate_lst.append(tractate_info)
total_1 = (timeit.default_timer() - time_1)
print("join device tractate cost {:.5f}s".format(total_1))
return device_info, tractate_lst, tractate_ids_res
import timeit
import tensorflow as tf
from .fe.tractate_fe import (CATEGORICAL_COLUMNS, FLOAT_COLUMNS, INT_COLUMNS, device_tractate_fe)
from .model import _bytes_feature, _float_feature, _int64_feature
def model_predict_tractate(device_id, tractate_ids, device_dict, tractate_dict, predict_fn):
try:
time_1 = timeit.default_timer()
device_info, tractate_lst, tractate_ids_res = device_tractate_fe(device_id, tractate_ids, device_dict, tractate_dict)
print("predict check: " + str(len(tractate_lst)) + " " + str(len(tractate_ids_res)))
int_columns = INT_COLUMNS
float_columns = FLOAT_COLUMNS
str_columns = CATEGORICAL_COLUMNS
examples = []
for tractate_info in tractate_lst:
tmp = {}
tmp.update(device_info)
tmp.update(tractate_info)
features = {}
for col in int_columns:
features[col] = _int64_feature(int(tmp[col]))
for col in float_columns:
features[col] = _float_feature(float(tmp[col]))
for col in str_columns:
features[col] = _bytes_feature(str(tmp[col]).encode(encoding="utf-8"))
example = tf.train.Example(features=tf.train.Features(feature=features))
examples.append(example.SerializeToString())
total_1 = (timeit.default_timer() - time_1)
print("make example cost {:.5f}s".format(total_1))
time_1 = timeit.default_timer()
predictions = predict_fn({"examples": examples})
print(predictions)
# res_tuple = sorted(zip(tractate_ids_res, predictions["output"].tolist()), key=lambda x: x[1], reverse=True)
# res = []
# for (id, _) in res_tuple:
# res.append(int(id))
# # print(res)
total_1 = (timeit.default_timer() - time_1)
print("prediction cost {:.5f}s".format(total_1))
# return res
except Exception as e:
print(e)
# device_info, _, _ = device_tractate_fe(device_id, tractate_ids, device_dict, tractate_dict)
# print(device_info)
return []
import os import os
import random
import shutil import shutil
import time import time
import timeit
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
...@@ -10,6 +12,7 @@ from sklearn.model_selection import train_test_split ...@@ -10,6 +12,7 @@ from sklearn.model_selection import train_test_split
from models.esmm.fe import click_fe, device_fe, fe, tractate_fe from models.esmm.fe import click_fe, device_fe, fe, tractate_fe
from models.esmm.input_fn import esmm_input_fn from models.esmm.input_fn import esmm_input_fn
from models.esmm.model import esmm_model_fn, model_export from models.esmm.model import esmm_model_fn, model_export
from models.esmm.tractate_model import model_predict_tractate
def main(): def main():
...@@ -17,52 +20,69 @@ def main(): ...@@ -17,52 +20,69 @@ def main():
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO) tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)
# data_path = Path("~/data/cvr_data").expanduser() # local # # data_path = Path("~/data/cvr_data").expanduser() # local
data_path = Path("/srv/apps/node2vec_git/cvr_data/") # server # data_path = Path("/srv/apps/node2vec_git/cvr_data/") # server
tractate_df, tractate_click_df, tractate_conversion_df = tractate_fe.read_csv_data(data_path) # tractate_df, tractate_click_df, tractate_conversion_df = tractate_fe.read_csv_data(data_path)
tractate_df = tractate_fe.tractate_feature_engineering(tractate_df) # tractate_df = tractate_fe.tractate_feature_engineering(tractate_df)
device_df = device_fe.read_csv_data(data_path) # device_df = device_fe.read_csv_data(data_path)
device_df = device_fe.device_feature_engineering(device_df, "tractate") # device_df = device_fe.device_feature_engineering(device_df, "tractate")
# print(device_df.columns) # # print(device_df.columns)
# print(device_df.dtypes, "\n") # # print(device_df.dtypes, "\n")
cc_df = click_fe.click_feature_engineering(tractate_click_df, tractate_conversion_df) # cc_df = click_fe.click_feature_engineering(tractate_click_df, tractate_conversion_df)
df = tractate_fe.join_features(device_df, tractate_df, cc_df) # df = tractate_fe.join_features(device_df, tractate_df, cc_df)
# for i in df.columns: # # for i in df.columns:
# print(i) # # print(i)
# print(df.dtypes) # # print(df.dtypes)
train_df, test_df = train_test_split(df, test_size=0.2) # train_df, test_df = train_test_split(df, test_size=0.2)
train_df, val_df = train_test_split(train_df, test_size=0.2) # train_df, val_df = train_test_split(train_df, test_size=0.2)
all_features = fe.build_features(df, tractate_fe.INT_COLUMNS, tractate_fe.FLOAT_COLUMNS, tractate_fe.CATEGORICAL_COLUMNS) # all_features = fe.build_features(df, tractate_fe.INT_COLUMNS, tractate_fe.FLOAT_COLUMNS, tractate_fe.CATEGORICAL_COLUMNS)
params = {"feature_columns": all_features, "hidden_units": [64, 32], "learning_rate": 0.1} # params = {"feature_columns": all_features, "hidden_units": [64, 32], "learning_rate": 0.1}
model_path = str(Path("~/data/model_tmp/").expanduser()) # model_path = str(Path("~/data/model_tmp/").expanduser())
# if os.path.exists(model_path): # # if os.path.exists(model_path):
# shutil.rmtree(model_path) # # shutil.rmtree(model_path)
session_config = tf.compat.v1.ConfigProto() # session_config = tf.compat.v1.ConfigProto()
session_config.gpu_options.allow_growth = True # session_config.gpu_options.allow_growth = True
session_config.gpu_options.per_process_gpu_memory_fraction = 0.9 # session_config.gpu_options.per_process_gpu_memory_fraction = 0.9
estimator_config = tf.estimator.RunConfig(session_config=session_config) # estimator_config = tf.estimator.RunConfig(session_config=session_config)
model = tf.estimator.Estimator(model_fn=esmm_model_fn, params=params, model_dir=model_path, config=estimator_config) # model = tf.estimator.Estimator(model_fn=esmm_model_fn, params=params, model_dir=model_path, config=estimator_config)
train_spec = tf.estimator.TrainSpec(input_fn=lambda: esmm_input_fn(train_df, shuffle=True), max_steps=50000) # train_spec = tf.estimator.TrainSpec(input_fn=lambda: esmm_input_fn(train_df, shuffle=True), max_steps=50000)
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: esmm_input_fn(val_df, shuffle=False)) # eval_spec = tf.estimator.EvalSpec(input_fn=lambda: esmm_input_fn(val_df, shuffle=False))
tf.estimator.train_and_evaluate(model, train_spec, eval_spec) # tf.estimator.train_and_evaluate(model, train_spec, eval_spec)
model_export_path = str(Path("~/data/models/tractate/").expanduser()) # model_export_path = str(Path("~/data/models/tractate/").expanduser())
save_path = model_export(model, all_features, model_export_path) # save_path = model_export(model, all_features, model_export_path)
print("save to: " + save_path) # print("save to: " + save_path)
total_time = (time.time() - time_begin) / 60 print("============================================================")
print("total cost {:.2f} mins at {}".format(total_time, datetime.now()))
# save_path = str(Path("~/data/models/tractate/1596089465").expanduser()) # local # # save_path = str(Path("~/data/models/tractate/1596089465").expanduser()) # local
# save_path = "/home/gmuser/data/models/tractate/" # server save_path = "/home/gmuser/data/models/tractate/1596092061" # server
predict_fn = tf.contrib.predictor.from_saved_model(save_path) predict_fn = tf.contrib.predictor.from_saved_model(save_path)
print("============================================================") device_dict = device_fe.get_device_dict_from_redis()
diary_dict = tractate_fe.get_diary_dict_from_redis()
device_ids = list(device_dict.keys())[:20]
diary_ids = list(diary_dict.keys())
print(device_dict(device_ids[0]), "\n")
print(diary_dict(diary_ids[0]), "\n")
for i in range(5):
time_1 = timeit.default_timer()
res = model_predict_tractate(
random.sample(device_ids, 1)[0], random.sample(diary_ids, 200), device_dict, diary_dict, predict_fn)
print(res[:10])
total_1 = (timeit.default_timer() - time_1)
print("total prediction cost {:.5f}s".format(total_1), "\n")
total_time = (time.time() - time_begin) / 60
print("total cost {:.2f} mins at {}".format(total_time, datetime.now()))
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment