Commit 94cc8bd3 authored by 赵威's avatar 赵威

get data from redis

parent 45ec36ab
......@@ -4,3 +4,4 @@ redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN6@172.16.40.13
redis_client2 = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
redis_client3 = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN12@172.16.40.164:6379")
redis_client4 = redis.StrictRedis.from_url("redis://:XfkMCCdWDIU%ls$h@172.16.50.145:6379")
redis_db_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN14@172.16.40.146:6379")
import json
import pandas as pd
import tensorflow as tf
from ...cache import redis_db_client
from .utils import common_elements, nth_element
......@@ -13,6 +16,26 @@ def read_csv_data(dataset_path):
return device_df.sample(10000), diary_df.sample(5000), click_df, conversion_df
def _get_data_from_redis(key):
column_key = key + ":column"
d = redis_db_client.hgetall(key)
tmp = d.values()
lists = []
for i in tmp:
lists.append(str(i, "utf-8").split("|"))
columns = str(redis_db_client.get(column_key), "utf-8").split("|")
df = pd.DataFrame(lists, columns=columns)
return df
def get_device_df_from_redis():
return _get_data_from_redis("cvr:db:device")
def get_diary_df_from_redis():
return _get_data_from_redis("cvr:db:content:diary")
def device_feature_engineering(df):
device_df = df.copy()
......
......@@ -8,47 +8,52 @@ from pathlib import Path
import tensorflow as tf
from sklearn.model_selection import train_test_split
from models.esmm.fe import (click_feature_engineering, device_feature_engineering, diary_feature_engineering, join_features,
read_csv_data)
from models.esmm.input_fn import build_features, esmm_input_fn
from models.esmm.model import esmm_model_fn, model_export, model_predict
from esmm.fe import (click_feature_engineering, device_feature_engineering, diary_feature_engineering, join_features,
read_csv_data, get_device_df_from_redis, get_diary_df_from_redis)
from esmm.input_fn import build_features, esmm_input_fn
from esmm.model import esmm_model_fn, model_export, model_predict
# tf.compat.v1.enable_eager_execution()
def main():
time_begin = time.time()
device_df, diary_df, click_df, conversion_df = read_csv_data(Path("~/data/cvr_data/"))
device_df = device_feature_engineering(device_df)
diary_df = diary_feature_engineering(diary_df)
cc_df = click_feature_engineering(click_df, conversion_df)
df = join_features(device_df, diary_df, cc_df)
train_df, test_df = train_test_split(df, test_size=0.2)
train_df, val_df = train_test_split(train_df, test_size=0.2)
all_features = build_features(df)
params = {"feature_columns": all_features, "hidden_units": [64, 32], "learning_rate": 0.1}
model_path = str(Path("~/data/model_tmp/").expanduser())
if os.path.exists(model_path):
shutil.rmtree(model_path)
model = tf.estimator.Estimator(model_fn=esmm_model_fn, params=params, model_dir=model_path)
print("train")
model.train(input_fn=lambda: esmm_input_fn(train_df, shuffle=True), steps=5000)
metrics = model.evaluate(input_fn=lambda: esmm_input_fn(val_df, False), steps=5000)
print("metrics: " + str(metrics))
model_export_path = str(Path("~/data/models/").expanduser())
save_path = model_export(model, all_features, model_export_path)
print("save to: " + save_path)
predict_fn = tf.contrib.predictor.from_saved_model(save_path)
for i in range(10):
test_300 = test_df.sample(300)
model_predict(test_300, predict_fn)
df = get_device_df_from_redis()
df2 = get_diary_df_from_redis()
print(df.size)
print(df2.size)
# device_df, diary_df, click_df, conversion_df = read_csv_data(Path("~/data/cvr_data/"))
# device_df = device_feature_engineering(device_df)
# diary_df = diary_feature_engineering(diary_df)
# cc_df = click_feature_engineering(click_df, conversion_df)
# df = join_features(device_df, diary_df, cc_df)
# train_df, test_df = train_test_split(df, test_size=0.2)
# train_df, val_df = train_test_split(train_df, test_size=0.2)
# all_features = build_features(df)
# params = {"feature_columns": all_features, "hidden_units": [64, 32], "learning_rate": 0.1}
# model_path = str(Path("~/data/model_tmp/").expanduser())
# if os.path.exists(model_path):
# shutil.rmtree(model_path)
# model = tf.estimator.Estimator(model_fn=esmm_model_fn, params=params, model_dir=model_path)
# print("train")
# model.train(input_fn=lambda: esmm_input_fn(train_df, shuffle=True), steps=5000)
# metrics = model.evaluate(input_fn=lambda: esmm_input_fn(val_df, False), steps=5000)
# print("metrics: " + str(metrics))
# model_export_path = str(Path("~/data/models/").expanduser())
# save_path = model_export(model, all_features, model_export_path)
# print("save to: " + save_path)
# predict_fn = tf.contrib.predictor.from_saved_model(save_path)
# for i in range(10):
# test_300 = test_df.sample(300)
# model_predict(test_300, predict_fn)
total_time = (time.time() - time_begin) / 60
print("cost {:.2f} mins at {}".format(total_time, datetime.now()))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment