Commit 932e5762 authored by 赵威's avatar 赵威

update fe

parent f55521ba
...@@ -10,8 +10,8 @@ import pandas as pd ...@@ -10,8 +10,8 @@ import pandas as pd
import tensorflow as tf import tensorflow as tf
from sklearn.model_selection import train_test_split from sklearn.model_selection import train_test_split
from models.esmm.fe import (click_feature_engineering, device_feature_engineering, diary_feature_engineering, from models.esmm import device_fe as device_fe
get_device_dict_from_redis, get_diary_dict_from_redis, join_features, read_csv_data) from models.esmm import diary_fe as diary_fe
from models.esmm.input_fn import build_features, esmm_input_fn from models.esmm.input_fn import build_features, esmm_input_fn
from models.esmm.model import esmm_model_fn, model_export, model_predict_diary from models.esmm.model import esmm_model_fn, model_export, model_predict_diary
...@@ -25,15 +25,18 @@ def main(): ...@@ -25,15 +25,18 @@ def main():
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# device_df, diary_df, click_df, conversion_df = read_csv_data(Path("~/data/cvr_data").expanduser()) # device_df, diary_df, click_df, conversion_df = diary_fe.read_csv_data(Path("~/data/cvr_data").expanduser())
device_df, diary_df, click_df, conversion_df = read_csv_data(Path("/srv/apps/node2vec_git/cvr_data/")) device_df, diary_df, click_df, conversion_df = diary_fe.read_csv_data(Path("/srv/apps/node2vec_git/cvr_data/"))
# print(diary_df.sample(1)) # print(diary_df.sample(1))
device_df = device_feature_engineering(device_df) device_df = device_fe.device_feature_engineering(device_df)
# print(device_df.sample(1)) print(device_df.sample(1))
diary_df = diary_feature_engineering(diary_df) diary_df = diary_fe.diary_feature_engineering(diary_df)
# print(diary_df.sample(1)) print(diary_df.sample(1))
cc_df = click_feature_engineering(click_df, conversion_df) cc_df = diary_fe.click_feature_engineering(click_df, conversion_df)
df = join_features(device_df, diary_df, cc_df) print(cc_df.sample(1))
df = diary_fe.join_features(device_df, diary_df, cc_df)
print(df.sample(1))
print(df.dtypes)
train_df, test_df = train_test_split(df, test_size=0.2) train_df, test_df = train_test_split(df, test_size=0.2)
train_df, val_df = train_test_split(train_df, test_size=0.2) train_df, val_df = train_test_split(train_df, test_size=0.2)
...@@ -41,8 +44,8 @@ def main(): ...@@ -41,8 +44,8 @@ def main():
all_features = build_features(df) all_features = build_features(df)
params = {"feature_columns": all_features, "hidden_units": [64, 32], "learning_rate": 0.1} params = {"feature_columns": all_features, "hidden_units": [64, 32], "learning_rate": 0.1}
model_path = str(Path("~/data/model_tmp/").expanduser()) model_path = str(Path("~/data/model_tmp/").expanduser())
if os.path.exists(model_path): # if os.path.exists(model_path):
shutil.rmtree(model_path) # shutil.rmtree(model_path)
session_config = tf.compat.v1.ConfigProto() session_config = tf.compat.v1.ConfigProto()
session_config.gpu_options.allow_growth = True session_config.gpu_options.allow_growth = True
...@@ -50,7 +53,8 @@ def main(): ...@@ -50,7 +53,8 @@ def main():
estimator_config = tf.estimator.RunConfig(session_config=session_config) estimator_config = tf.estimator.RunConfig(session_config=session_config)
model = tf.estimator.Estimator(model_fn=esmm_model_fn, params=params, model_dir=model_path, config=estimator_config) model = tf.estimator.Estimator(model_fn=esmm_model_fn, params=params, model_dir=model_path, config=estimator_config)
train_spec = tf.estimator.TrainSpec(input_fn=lambda: esmm_input_fn(train_df, shuffle=True), max_steps=50000) # TODO 50000
train_spec = tf.estimator.TrainSpec(input_fn=lambda: esmm_input_fn(train_df, shuffle=True), max_steps=20000)
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: esmm_input_fn(val_df, shuffle=False)) eval_spec = tf.estimator.EvalSpec(input_fn=lambda: esmm_input_fn(val_df, shuffle=False))
tf.estimator.train_and_evaluate(model, train_spec, eval_spec) tf.estimator.train_and_evaluate(model, train_spec, eval_spec)
...@@ -76,8 +80,8 @@ def main(): ...@@ -76,8 +80,8 @@ def main():
# "16195283", "16838351", "17161073", "17297878", "17307484", "17396235", "16418737", "16995481", "17312201", "12237988" # "16195283", "16838351", "17161073", "17297878", "17307484", "17396235", "16418737", "16995481", "17312201", "12237988"
# ] # ]
device_dict = get_device_dict_from_redis() device_dict = diary_fe.get_device_dict_from_redis()
diary_dict = get_diary_dict_from_redis() diary_dict = diary_fe.get_diary_dict_from_redis()
device_ids = list(device_dict.keys())[:20] device_ids = list(device_dict.keys())[:20]
diary_ids = list(diary_dict.keys()) diary_ids = list(diary_dict.keys())
......
from utils.cache import redis_db_client
# "channel_first", "city_first", "model_first",
DIARY_DEVICE_COLUMNS = [
"device_id", "active_type", "active_days", "past_consume_ability_history", "potential_consume_ability_history",
"price_sensitive_history", "first_demands", "second_demands", "first_solutions", "second_solutions", "first_positions",
"second_positions", "projects"
]
def get_device_dict_from_redis():
"""
return: {device_id: {first_demands: [], city_first: ""}}
"""
# TODO
db_key = "cvr:db:device2"
column_key = db_key + ":column"
columns = str(redis_db_client.get(column_key), "utf-8").split("|")
d = redis_db_client.hgetall(db_key)
res = {}
for i in d.values():
row_list = str(i, "utf-8").split("|")
tmp = {}
for (index, elem) in enumerate(row_list):
col_name = columns[index]
if col_name in [
"first_demands", "second_demands", "first_solutions", "second_solutions", "first_positions",
"second_positions", "projects"
]:
tmp[col_name] = elem.split(",")
else:
tmp[col_name] = elem
res[tmp["device_id"]] = tmp
return res
def device_feature_engineering(df):
device_df = df.copy()
device_df["first_demands"] = device_df["first_demands"].str.split(",")
device_df["second_demands"] = device_df["second_demands"].str.split(",")
device_df["first_solutions"] = device_df["first_solutions"].str.split(",")
device_df["second_solutions"] = device_df["second_solutions"].str.split(",")
device_df["first_positions"] = device_df["first_positions"].str.split(",")
device_df["second_positions"] = device_df["second_positions"].str.split(",")
device_df["projects"] = device_df["projects"].str.split(",")
device_df["first_demands"] = device_df["first_demands"].apply(lambda d: d if isinstance(d, list) else [])
device_df["second_demands"] = device_df["second_demands"].apply(lambda d: d if isinstance(d, list) else [])
device_df["first_solutions"] = device_df["first_solutions"].apply(lambda d: d if isinstance(d, list) else [])
device_df["second_solutions"] = device_df["second_solutions"].apply(lambda d: d if isinstance(d, list) else [])
device_df["first_positions"] = device_df["first_positions"].apply(lambda d: d if isinstance(d, list) else [])
device_df["second_positions"] = device_df["second_positions"].apply(lambda d: d if isinstance(d, list) else [])
device_df["projects"] = device_df["projects"].apply(lambda d: d if isinstance(d, list) else [])
device_df["city_first"] = device_df["city_first"].fillna("")
device_df["model_first"] = device_df["model_first"].fillna("")
nullseries = device_df.isnull().sum()
print("device:")
print(nullseries[nullseries > 0])
print(device_df.shape)
return device_df[DIARY_DEVICE_COLUMNS]
import timeit
import pandas as pd
from utils.cache import redis_db_client
from .utils import common_elements, nth_element
DIARY_COLUMNS = [
"card_id", "is_pure_author", "is_have_reply", "is_have_pure_reply", "content_level", "topic_num", "favor_num", "vote_num",
"one_ctr", "three_ctr", "seven_ctr", "fifteen_ctr", "first_demands", "second_demands", "first_solutions", "second_solutions",
"first_positions", "second_positions", "projects"
]
def read_csv_data(dataset_path):
device_df = pd.read_csv(dataset_path.joinpath("device.csv"), sep="|")
device_df.drop_duplicates(subset=["device_id"], inplace=True)
diary_df = pd.read_csv(dataset_path.joinpath("diary.csv"), sep="|")
click_df = pd.read_csv(dataset_path.joinpath("diary_click.csv"), sep="|")
conversion_df = pd.read_csv(dataset_path.joinpath("diary_click_cvr.csv"), sep="|")
return device_df, diary_df, click_df, conversion_df
def get_diary_dict_from_redis():
"""
return: {diary_id: {first_demands: [], is_pure_author: 1}}
"""
db_key = "cvr:db:content:diary"
column_key = db_key + ":column"
columns = str(redis_db_client.get(column_key), "utf-8").split("|")
d = redis_db_client.hgetall(db_key)
res = {}
for i in d.values():
row_list = str(i, "utf-8").split("|")
tmp = {}
for (index, elem) in enumerate(row_list):
col_name = columns[index]
if col_name in [
"first_demands", "second_demands", "first_solutions", "second_solutions", "first_positions",
"second_positions", "projects"
]:
tmp[col_name] = elem.split(",")
elif col_name in ["is_pure_author", "is_have_pure_reply", "is_have_reply"]:
if elem == "true":
tmp[col_name] = 1
else:
tmp[col_name] = 0
else:
tmp[col_name] = elem
res[int(tmp["card_id"])] = tmp
return res
def diary_feature_engineering(df):
diary_df = df.copy()
diary_df["first_demands"] = diary_df["first_demands"].str.split(",")
diary_df["second_demands"] = diary_df["second_demands"].str.split(",")
diary_df["first_solutions"] = diary_df["first_solutions"].str.split(",")
diary_df["second_solutions"] = diary_df["second_solutions"].str.split(",")
diary_df["first_positions"] = diary_df["first_positions"].str.split(",")
diary_df["second_positions"] = diary_df["second_positions"].str.split(",")
diary_df["projects"] = diary_df["projects"].str.split(",")
diary_df["first_demands"] = diary_df["first_demands"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["second_demands"] = diary_df["second_demands"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["first_solutions"] = diary_df["first_solutions"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["second_solutions"] = diary_df["second_solutions"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["first_positions"] = diary_df["first_positions"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["second_positions"] = diary_df["second_positions"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["projects"] = diary_df["projects"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["is_pure_author"] = diary_df["is_pure_author"].astype(int)
diary_df["is_have_pure_reply"] = diary_df["is_have_pure_reply"].astype(int)
diary_df["is_have_reply"] = diary_df["is_have_reply"].astype(int)
print("diary:")
nullseries = diary_df.isnull().sum()
print(nullseries[nullseries > 0])
print(diary_df.shape)
return diary_df[DIARY_COLUMNS]
def click_feature_engineering(click_df, conversion_df):
# click_df = click_df.copy()
# conversion_df = conversion_df.copy()
click_df.rename(columns={"label": "click_label"}, inplace=True)
conversion_df.rename(columns={"label": "conversion_label"}, inplace=True)
cc_df = pd.merge(click_df, conversion_df, how="left", left_on=["cl_id", "card_id"], right_on=["cl_id", "card_id"])
cc_df.drop(["partition_date_x", "partition_date_y"], axis=1, inplace=True)
cc_df["conversion_label"].fillna(0, inplace=True)
print("click:")
nullseries = cc_df.isnull().sum()
print(nullseries[nullseries > 0])
print(cc_df.shape)
return cc_df
def join_features(device_df, diary_df, cc_df):
a = pd.merge(device_df, cc_df, how="inner", left_on="device_id", right_on="cl_id")
df = pd.merge(a, diary_df, how="inner", left_on="card_id", right_on="card_id")
df["first_demands"] = df[["first_demands_x", "first_demands_y"]].apply(lambda x: common_elements(*x), axis=1)
df["second_demands"] = df[["second_demands_x", "second_demands_y"]].apply(lambda x: common_elements(*x), axis=1)
df["first_solutions"] = df[["first_solutions_x", "first_solutions_y"]].apply(lambda x: common_elements(*x), axis=1)
df["second_solutions"] = df[["second_solutions_x", "second_solutions_y"]].apply(lambda x: common_elements(*x), axis=1)
df["first_positions"] = df[["first_positions_x", "second_positions_y"]].apply(lambda x: common_elements(*x), axis=1)
df["second_positions"] = df[["second_positions_x", "second_positions_y"]].apply(lambda x: common_elements(*x), axis=1)
df["projects"] = df[["projects_x", "projects_y"]].apply(lambda x: common_elements(*x), axis=1)
df["device_fd"] = df["first_demands_x"].apply(lambda x: nth_element(x, 0))
df["device_sd"] = df["second_demands_x"].apply(lambda x: nth_element(x, 0))
df["device_fs"] = df["first_solutions_x"].apply(lambda x: nth_element(x, 0))
df["device_ss"] = df["second_solutions_x"].apply(lambda x: nth_element(x, 0))
df["device_fp"] = df["first_positions_x"].apply(lambda x: nth_element(x, 0))
df["device_sp"] = df["second_positions_x"].apply(lambda x: nth_element(x, 0))
df["device_p"] = df["projects_x"].apply(lambda x: nth_element(x, 0))
df["content_fd"] = df["first_demands_y"].apply(lambda x: nth_element(x, 0))
df["content_sd"] = df["second_demands_y"].apply(lambda x: nth_element(x, 0))
df["content_fs"] = df["first_solutions_y"].apply(lambda x: nth_element(x, 0))
df["content_ss"] = df["second_solutions_y"].apply(lambda x: nth_element(x, 0))
df["content_fp"] = df["first_positions_y"].apply(lambda x: nth_element(x, 0))
df["content_sp"] = df["second_positions_y"].apply(lambda x: nth_element(x, 0))
df["content_p"] = df["projects_y"].apply(lambda x: nth_element(x, 0))
df["fd1"] = df["first_demands"].apply(lambda x: nth_element(x, 0))
df["fd2"] = df["first_demands"].apply(lambda x: nth_element(x, 1))
df["fd3"] = df["first_demands"].apply(lambda x: nth_element(x, 2))
df["sd1"] = df["second_demands"].apply(lambda x: nth_element(x, 0))
df["sd2"] = df["second_demands"].apply(lambda x: nth_element(x, 1))
df["sd3"] = df["second_demands"].apply(lambda x: nth_element(x, 2))
df["fs1"] = df["first_solutions"].apply(lambda x: nth_element(x, 0))
df["fs2"] = df["first_solutions"].apply(lambda x: nth_element(x, 1))
df["fs3"] = df["first_solutions"].apply(lambda x: nth_element(x, 2))
df["ss1"] = df["second_solutions"].apply(lambda x: nth_element(x, 0))
df["ss2"] = df["second_solutions"].apply(lambda x: nth_element(x, 1))
df["ss3"] = df["second_solutions"].apply(lambda x: nth_element(x, 2))
df["fp1"] = df["first_positions"].apply(lambda x: nth_element(x, 0))
df["fp2"] = df["first_positions"].apply(lambda x: nth_element(x, 1))
df["fp3"] = df["first_positions"].apply(lambda x: nth_element(x, 2))
df["sp1"] = df["second_positions"].apply(lambda x: nth_element(x, 0))
df["sp2"] = df["second_positions"].apply(lambda x: nth_element(x, 1))
df["sp3"] = df["second_positions"].apply(lambda x: nth_element(x, 2))
df["p1"] = df["projects"].apply(lambda x: nth_element(x, 0))
df["p2"] = df["projects"].apply(lambda x: nth_element(x, 1))
df["p3"] = df["projects"].apply(lambda x: nth_element(x, 2))
print("df:")
nullseries = df.isnull().sum()
print(nullseries[nullseries > 0])
print(df.shape)
drop_columns = [
"cl_id", "first_demands_x", "first_demands_y", "first_demands", "second_demands_x", "second_demands_y", "second_demands",
"first_solutions_x", "first_solutions_y", "first_solutions", "second_solutions_x", "second_solutions_y",
"second_solutions", "first_positions_x", "first_positions_y", "first_positions", "second_positions_x",
"second_positions_y", "second_positions", "projects_x", "projects_y", "projects"
]
# for col in drop_columns:
# if col in df.columns:
# df.drop(col, inplace=True, axis=1)
df.drop(drop_columns, inplace=True, axis=1)
return df
def device_diary_fe(device_id, diary_ids, device_dict, diary_dict):
time_1 = timeit.default_timer()
device_info = device_dict.get(device_id, {}).copy()
if not device_info:
device_info = {
"device_id": device_id,
"active_type": "1",
"active_days": "0",
"past_consume_ability_history": "极弱",
"potential_consume_ability_history": "极弱",
"price_sensitive_history": "不敏感无消费",
"device_click_num_1d": 0,
"device_click_num_3d": 0,
"device_click_num_7d": 0,
"device_click_num_15d": 0,
"device_click_num_30d": 0,
"device_click_num_180d": 0
}
device_fd = device_info.get("first_demands", [])
device_sd = device_info.get("second_demands", [])
device_fs = device_info.get("first_solutions", [])
device_ss = device_info.get("second_solutions", [])
device_fp = device_info.get("first_positions", [])
device_sp = device_info.get("second_positions", [])
device_p = device_info.get("projects", [])
device_info["device_fd"] = nth_element(device_fd, 0)
device_info["device_sd"] = nth_element(device_sd, 0)
device_info["device_fs"] = nth_element(device_fs, 0)
device_info["device_ss"] = nth_element(device_ss, 0)
device_info["device_fp"] = nth_element(device_fp, 0)
device_info["device_sp"] = nth_element(device_sp, 0)
device_info["device_p"] = nth_element(device_p, 0)
diary_lst = []
diary_ids_res = []
for id in diary_ids:
diary_info = diary_dict.get(id, {}).copy()
if diary_info:
diary_ids_res.append(diary_info.get("card_id", "-1"))
diary_fd = diary_info.get("first_demands", [])
diary_sd = diary_info.get("second_demands", [])
diary_fs = diary_info.get("first_solutions", [])
diary_ss = diary_info.get("second_solutions", [])
diary_fp = diary_info.get("first_positions", [])
diary_sp = diary_info.get("second_positions", [])
diary_p = diary_info.get("projects", [])
common_fd = common_elements(device_fd, diary_fd)
common_sd = common_elements(device_sd, diary_sd)
common_fs = common_elements(device_fs, diary_fs)
common_ss = common_elements(device_ss, diary_ss)
common_fp = common_elements(device_fp, diary_fp)
common_sp = common_elements(device_sp, diary_sp)
common_p = common_elements(device_p, diary_p)
diary_info["content_fd"] = nth_element(diary_fd, 0)
diary_info["content_sd"] = nth_element(diary_sd, 0)
diary_info["content_fs"] = nth_element(diary_fs, 0)
diary_info["content_ss"] = nth_element(diary_ss, 0)
diary_info["content_fp"] = nth_element(diary_fp, 0)
diary_info["content_sp"] = nth_element(diary_sp, 0)
diary_info["content_p"] = nth_element(diary_p, 0)
diary_info["fd1"] = nth_element(common_fd, 0)
diary_info["fd2"] = nth_element(common_fd, 1)
diary_info["fd3"] = nth_element(common_fd, 2)
diary_info["sd1"] = nth_element(common_sd, 0)
diary_info["sd2"] = nth_element(common_sd, 1)
diary_info["sd3"] = nth_element(common_sd, 2)
diary_info["fs1"] = nth_element(common_fs, 0)
diary_info["fs2"] = nth_element(common_fs, 1)
diary_info["fs3"] = nth_element(common_fs, 2)
diary_info["ss1"] = nth_element(common_ss, 0)
diary_info["ss2"] = nth_element(common_ss, 1)
diary_info["ss3"] = nth_element(common_ss, 2)
diary_info["fp1"] = nth_element(common_fp, 0)
diary_info["fp2"] = nth_element(common_fp, 1)
diary_info["fp3"] = nth_element(common_fp, 2)
diary_info["sp1"] = nth_element(common_sp, 0)
diary_info["sp2"] = nth_element(common_sp, 1)
diary_info["sp3"] = nth_element(common_sp, 2)
diary_info["p1"] = nth_element(common_p, 0)
diary_info["p2"] = nth_element(common_p, 1)
diary_info["p3"] = nth_element(common_p, 2)
diary_lst.append(diary_info)
total_1 = (timeit.default_timer() - time_1)
print("join device diary cost {:.5f}s".format(total_1))
return device_info, diary_lst, diary_ids_res
import timeit
import pandas as pd
from utils.cache import redis_db_client
from .utils import common_elements, nth_element
def read_csv_data(dataset_path):
device_df = pd.read_csv(dataset_path.joinpath("device.csv"), sep="|")
device_df.drop_duplicates(subset=["device_id"], inplace=True)
diary_df = pd.read_csv(dataset_path.joinpath("diary.csv"), sep="|")
click_df = pd.read_csv(dataset_path.joinpath("diary_click.csv"), sep="|")
conversion_df = pd.read_csv(dataset_path.joinpath("diary_click_cvr.csv"), sep="|")
return device_df, diary_df, click_df, conversion_df
def get_device_dict_from_redis():
# TODO
db_key = "cvr:db:device2"
column_key = db_key + ":column"
columns = str(redis_db_client.get(column_key), "utf-8").split("|")
d = redis_db_client.hgetall(db_key)
res = {}
for i in d.values():
row_list = str(i, "utf-8").split("|")
tmp = {}
for (index, elem) in enumerate(row_list):
col_name = columns[index]
if col_name in [
"first_demands", "second_demands", "first_solutions", "second_solutions", "first_positions",
"second_positions", "projects"
]:
tmp[col_name] = elem.split(",")
else:
tmp[col_name] = elem
res[tmp["device_id"]] = tmp
return res
def get_diary_dict_from_redis():
db_key = "cvr:db:content:diary"
column_key = db_key + ":column"
columns = str(redis_db_client.get(column_key), "utf-8").split("|")
d = redis_db_client.hgetall(db_key)
res = {}
for i in d.values():
row_list = str(i, "utf-8").split("|")
tmp = {}
for (index, elem) in enumerate(row_list):
col_name = columns[index]
if col_name in [
"first_demands", "second_demands", "first_solutions", "second_solutions", "first_positions",
"second_positions", "projects"
]:
tmp[col_name] = elem.split(",")
elif col_name in ["is_pure_author", "is_have_pure_reply", "is_have_reply"]:
if elem == "true":
tmp[col_name] = 1
else:
tmp[col_name] = 0
else:
tmp[col_name] = elem
res[int(tmp["card_id"])] = tmp
return res
def device_feature_engineering(df):
device_df = df.copy()
device_df["first_demands"] = device_df["first_demands"].str.split(",")
device_df["second_demands"] = device_df["second_demands"].str.split(",")
device_df["first_solutions"] = device_df["first_solutions"].str.split(",")
device_df["second_solutions"] = device_df["second_solutions"].str.split(",")
device_df["first_positions"] = device_df["first_positions"].str.split(",")
device_df["second_positions"] = device_df["second_positions"].str.split(",")
device_df["projects"] = device_df["projects"].str.split(",")
device_df["first_demands"] = device_df["first_demands"].apply(lambda d: d if isinstance(d, list) else [])
device_df["second_demands"] = device_df["second_demands"].apply(lambda d: d if isinstance(d, list) else [])
device_df["first_solutions"] = device_df["first_solutions"].apply(lambda d: d if isinstance(d, list) else [])
device_df["second_solutions"] = device_df["second_solutions"].apply(lambda d: d if isinstance(d, list) else [])
device_df["first_positions"] = device_df["first_positions"].apply(lambda d: d if isinstance(d, list) else [])
device_df["second_positions"] = device_df["second_positions"].apply(lambda d: d if isinstance(d, list) else [])
device_df["projects"] = device_df["projects"].apply(lambda d: d if isinstance(d, list) else [])
device_df["city_first"] = device_df["city_first"].fillna("")
device_df["model_first"] = device_df["model_first"].fillna("")
nullseries = device_df.isnull().sum()
print("device:")
print(nullseries[nullseries > 0])
print(device_df.shape)
device_columns = [
"device_id", "active_type", "active_days", "channel_first", "city_first", "model_first", "past_consume_ability_history",
"potential_consume_ability_history", "price_sensitive_history", "first_demands", "second_demands", "first_solutions",
"second_solutions", "first_positions", "second_positions", "projects"
]
return device_df[device_columns]
def diary_feature_engineering(df, from_redis=False):
diary_df = df.copy()
str_bool_map = {"true": True, "false": False}
diary_df["first_demands"] = diary_df["first_demands"].str.split(",")
diary_df["second_demands"] = diary_df["second_demands"].str.split(",")
diary_df["first_solutions"] = diary_df["first_solutions"].str.split(",")
diary_df["second_solutions"] = diary_df["second_solutions"].str.split(",")
diary_df["first_positions"] = diary_df["first_positions"].str.split(",")
diary_df["second_positions"] = diary_df["second_positions"].str.split(",")
diary_df["projects"] = diary_df["projects"].str.split(",")
diary_df["first_demands"] = diary_df["first_demands"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["second_demands"] = diary_df["second_demands"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["first_solutions"] = diary_df["first_solutions"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["second_solutions"] = diary_df["second_solutions"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["first_positions"] = diary_df["first_positions"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["second_positions"] = diary_df["second_positions"].apply(lambda d: d if isinstance(d, list) else [])
diary_df["projects"] = diary_df["projects"].apply(lambda d: d if isinstance(d, list) else [])
if from_redis:
diary_df["is_pure_author"] = diary_df["is_pure_author"].map(str_bool_map)
diary_df["is_have_pure_reply"] = diary_df["is_have_pure_reply"].map(str_bool_map)
diary_df["is_have_reply"] = diary_df["is_have_reply"].map(str_bool_map)
diary_df["is_pure_author"] = diary_df["is_pure_author"].astype(int)
diary_df["is_have_pure_reply"] = diary_df["is_have_pure_reply"].astype(int)
diary_df["is_have_reply"] = diary_df["is_have_reply"].astype(int)
print("diary:")
nullseries = diary_df.isnull().sum()
print(nullseries[nullseries > 0])
print(diary_df.shape)
diary_columns = [
"card_id", "is_pure_author", "is_have_reply", "is_have_pure_reply", "content_level", "topic_num", "favor_num", "vote_num",
"one_ctr", "three_ctr", "seven_ctr", "fifteen_ctr", "first_demands", "second_demands", "first_solutions",
"second_solutions", "first_positions", "second_positions", "projects"
]
return diary_df[diary_columns]
def click_feature_engineering(click_df, conversion_df):
# click_df = click_df.copy()
# conversion_df = conversion_df.copy()
click_df.rename(columns={"label": "click_label"}, inplace=True)
conversion_df.rename(columns={"label": "conversion_label"}, inplace=True)
cc_df = pd.merge(click_df, conversion_df, how="left", left_on=["cl_id", "card_id"], right_on=["cl_id", "card_id"])
cc_df.drop(["partition_date_x", "partition_date_y"], axis=1, inplace=True)
cc_df["conversion_label"].fillna(0, inplace=True)
print("click:")
nullseries = cc_df.isnull().sum()
print(nullseries[nullseries > 0])
print(cc_df.shape)
return cc_df
def join_features(device_df, diary_df, cc_df):
a = pd.merge(device_df, cc_df, how="inner", left_on="device_id", right_on="cl_id")
df = pd.merge(a, diary_df, how="inner", left_on="card_id", right_on="card_id")
df["first_demands"] = df[["first_demands_x", "first_demands_y"]].apply(lambda x: common_elements(*x), axis=1)
df["second_demands"] = df[["second_demands_x", "second_demands_y"]].apply(lambda x: common_elements(*x), axis=1)
df["first_solutions"] = df[["first_solutions_x", "first_solutions_y"]].apply(lambda x: common_elements(*x), axis=1)
df["second_solutions"] = df[["second_solutions_x", "second_solutions_y"]].apply(lambda x: common_elements(*x), axis=1)
df["first_positions"] = df[["first_positions_x", "second_positions_y"]].apply(lambda x: common_elements(*x), axis=1)
df["second_positions"] = df[["second_positions_x", "second_positions_y"]].apply(lambda x: common_elements(*x), axis=1)
df["projects"] = df[["projects_x", "projects_y"]].apply(lambda x: common_elements(*x), axis=1)
df["device_fd"] = df["first_demands_x"].apply(lambda x: nth_element(x, 0))
df["device_sd"] = df["second_demands_x"].apply(lambda x: nth_element(x, 0))
df["device_fs"] = df["first_solutions_x"].apply(lambda x: nth_element(x, 0))
df["device_ss"] = df["second_solutions_x"].apply(lambda x: nth_element(x, 0))
df["device_fp"] = df["first_positions_x"].apply(lambda x: nth_element(x, 0))
df["device_sp"] = df["second_positions_x"].apply(lambda x: nth_element(x, 0))
df["device_p"] = df["projects_x"].apply(lambda x: nth_element(x, 0))
df["content_fd"] = df["first_demands_y"].apply(lambda x: nth_element(x, 0))
df["content_sd"] = df["second_demands_y"].apply(lambda x: nth_element(x, 0))
df["content_fs"] = df["first_solutions_y"].apply(lambda x: nth_element(x, 0))
df["content_ss"] = df["second_solutions_y"].apply(lambda x: nth_element(x, 0))
df["content_fp"] = df["first_positions_y"].apply(lambda x: nth_element(x, 0))
df["content_sp"] = df["second_positions_y"].apply(lambda x: nth_element(x, 0))
df["content_p"] = df["projects_y"].apply(lambda x: nth_element(x, 0))
df["fd1"] = df["first_demands"].apply(lambda x: nth_element(x, 0))
df["fd2"] = df["first_demands"].apply(lambda x: nth_element(x, 1))
df["fd3"] = df["first_demands"].apply(lambda x: nth_element(x, 2))
df["sd1"] = df["second_demands"].apply(lambda x: nth_element(x, 0))
df["sd2"] = df["second_demands"].apply(lambda x: nth_element(x, 1))
df["sd3"] = df["second_demands"].apply(lambda x: nth_element(x, 2))
df["fs1"] = df["first_solutions"].apply(lambda x: nth_element(x, 0))
df["fs2"] = df["first_solutions"].apply(lambda x: nth_element(x, 1))
df["fs3"] = df["first_solutions"].apply(lambda x: nth_element(x, 2))
df["ss1"] = df["second_solutions"].apply(lambda x: nth_element(x, 0))
df["ss2"] = df["second_solutions"].apply(lambda x: nth_element(x, 1))
df["ss3"] = df["second_solutions"].apply(lambda x: nth_element(x, 2))
df["fp1"] = df["first_positions"].apply(lambda x: nth_element(x, 0))
df["fp2"] = df["first_positions"].apply(lambda x: nth_element(x, 1))
df["fp3"] = df["first_positions"].apply(lambda x: nth_element(x, 2))
df["sp1"] = df["second_positions"].apply(lambda x: nth_element(x, 0))
df["sp2"] = df["second_positions"].apply(lambda x: nth_element(x, 1))
df["sp3"] = df["second_positions"].apply(lambda x: nth_element(x, 2))
df["p1"] = df["projects"].apply(lambda x: nth_element(x, 0))
df["p2"] = df["projects"].apply(lambda x: nth_element(x, 1))
df["p3"] = df["projects"].apply(lambda x: nth_element(x, 2))
print("df:")
nullseries = df.isnull().sum()
print(nullseries[nullseries > 0])
print(df.shape)
drop_columns = [
"cl_id", "first_demands_x", "first_demands_y", "first_demands", "second_demands_x", "second_demands_y", "second_demands",
"first_solutions_x", "first_solutions_y", "first_solutions", "second_solutions_x", "second_solutions_y",
"second_solutions", "first_positions_x", "first_positions_y", "first_positions", "second_positions_x",
"second_positions_y", "second_positions", "projects_x", "projects_y", "projects"
]
# for col in drop_columns:
# if col in df.columns:
# df.drop(col, inplace=True, axis=1)
df.drop(drop_columns, inplace=True, axis=1)
return df
def device_diary_fe(device_id, diary_ids, device_dict, diary_dict):
time_1 = timeit.default_timer()
device_info = device_dict.get(device_id, {}).copy()
if not device_info:
device_info = {
"device_id": device_id,
"active_type": "1",
"active_days": "0",
"past_consume_ability_history": "极弱",
"potential_consume_ability_history": "极弱",
"price_sensitive_history": "不敏感无消费",
"device_click_num_1d": 0,
"device_click_num_3d": 0,
"device_click_num_7d": 0,
"device_click_num_15d": 0,
"device_click_num_30d": 0,
"device_click_num_180d": 0
}
device_fd = device_info.get("first_demands", [])
device_sd = device_info.get("second_demands", [])
device_fs = device_info.get("first_solutions", [])
device_ss = device_info.get("second_solutions", [])
device_fp = device_info.get("first_positions", [])
device_sp = device_info.get("second_positions", [])
device_p = device_info.get("projects", [])
device_info["device_fd"] = nth_element(device_fd, 0)
device_info["device_sd"] = nth_element(device_sd, 0)
device_info["device_fs"] = nth_element(device_fs, 0)
device_info["device_ss"] = nth_element(device_ss, 0)
device_info["device_fp"] = nth_element(device_fp, 0)
device_info["device_sp"] = nth_element(device_sp, 0)
device_info["device_p"] = nth_element(device_p, 0)
diary_lst = []
diary_ids_res = []
for id in diary_ids:
diary_info = diary_dict.get(id, {}).copy()
if diary_info:
diary_ids_res.append(diary_info.get("card_id", "-1"))
diary_fd = diary_info.get("first_demands", [])
diary_sd = diary_info.get("second_demands", [])
diary_fs = diary_info.get("first_solutions", [])
diary_ss = diary_info.get("second_solutions", [])
diary_fp = diary_info.get("first_positions", [])
diary_sp = diary_info.get("second_positions", [])
diary_p = diary_info.get("projects", [])
common_fd = common_elements(device_fd, diary_fd)
common_sd = common_elements(device_sd, diary_sd)
common_fs = common_elements(device_fs, diary_fs)
common_ss = common_elements(device_ss, diary_ss)
common_fp = common_elements(device_fp, diary_fp)
common_sp = common_elements(device_sp, diary_sp)
common_p = common_elements(device_p, diary_p)
diary_info["content_fd"] = nth_element(diary_fd, 0)
diary_info["content_sd"] = nth_element(diary_sd, 0)
diary_info["content_fs"] = nth_element(diary_fs, 0)
diary_info["content_ss"] = nth_element(diary_ss, 0)
diary_info["content_fp"] = nth_element(diary_fp, 0)
diary_info["content_sp"] = nth_element(diary_sp, 0)
diary_info["content_p"] = nth_element(diary_p, 0)
diary_info["fd1"] = nth_element(common_fd, 0)
diary_info["fd2"] = nth_element(common_fd, 1)
diary_info["fd3"] = nth_element(common_fd, 2)
diary_info["sd1"] = nth_element(common_sd, 0)
diary_info["sd2"] = nth_element(common_sd, 1)
diary_info["sd3"] = nth_element(common_sd, 2)
diary_info["fs1"] = nth_element(common_fs, 0)
diary_info["fs2"] = nth_element(common_fs, 1)
diary_info["fs3"] = nth_element(common_fs, 2)
diary_info["ss1"] = nth_element(common_ss, 0)
diary_info["ss2"] = nth_element(common_ss, 1)
diary_info["ss3"] = nth_element(common_ss, 2)
diary_info["fp1"] = nth_element(common_fp, 0)
diary_info["fp2"] = nth_element(common_fp, 1)
diary_info["fp3"] = nth_element(common_fp, 2)
diary_info["sp1"] = nth_element(common_sp, 0)
diary_info["sp2"] = nth_element(common_sp, 1)
diary_info["sp3"] = nth_element(common_sp, 2)
diary_info["p1"] = nth_element(common_p, 0)
diary_info["p2"] = nth_element(common_p, 1)
diary_info["p3"] = nth_element(common_p, 2)
diary_lst.append(diary_info)
total_1 = (timeit.default_timer() - time_1)
print("join device diary cost {:.5f}s".format(total_1))
return device_info, diary_lst, diary_ids_res
...@@ -6,7 +6,7 @@ from tensorflow import feature_column as fc ...@@ -6,7 +6,7 @@ from tensorflow import feature_column as fc
from tensorflow.python.estimator.canned import head as head_lib from tensorflow.python.estimator.canned import head as head_lib
from tensorflow.python.ops.losses import losses from tensorflow.python.ops.losses import losses
from .fe import device_diary_fe from .diary_fe import device_diary_fe
from .utils import common_elements, nth_element from .utils import common_elements, nth_element
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment