Commit a4c31b88 authored by 高雅喆's avatar 高雅喆

evaluation metrics

parent a578114c
import pymysql
import redis
import datetime
import time
import json
import numpy as np
import pandas as pd
from eda.smart_rank.dist_update_user_portrait_service import *
def get_user_service_portrait_not_alipay(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2tag, size=10):
try:
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
# 用户的非搜索、支付的行为
user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \
"where cl_id ='{}' and action not in " \
"('api/settlement/alipay_callback','do_search')".format(cl_id)
cur_jerry_test.execute(user_df_service_sql)
data = list(cur_jerry_test.fetchall())
if data:
user_df_service = pd.DataFrame(data)
user_df_service.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]
else:
user_df_service = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"])
# 用户的搜索行为
user_df_search_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \
"where cl_id ='{}' and action = 'do_search'".format(cl_id)
cur_jerry_test.execute(user_df_search_sql)
data_search = list(cur_jerry_test.fetchall())
db_jerry_test.close()
if data_search:
user_df_search = pd.DataFrame(data_search)
user_df_search.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]
else:
user_df_search = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"])
# 搜索词转成tag
# user_df_search_2_tag = pd.DataFrame(columns=list(user_df_service.columns))
for index, row in user_df_search.iterrows():
if row['tag_referrer'] in all_word_tags:
for search_tag in all_word_tags[row['tag_referrer']]:
row['tag_id'] = int(search_tag)
user_df_service = user_df_service.append(row, ignore_index=True)
break
# 增加df字段(days_diff_now, tag_type, tag2)
if not user_df_service.empty:
user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60))
user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1)
user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])]
user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist()
user_df_service["tag2"] = user_df_service.apply(lambda x:
get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list)
if x.tag_type == '3' else x.tag_id, axis=1)
user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1)
# 算分及比例
user_df_service["tag_score"] = user_df_service.apply(
lambda x: compute_henqiang(x.days_diff_now)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else (
compute_ai_scan(x.days_diff_now)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else (
compute_ruoyixiang(x.days_diff_now)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else
compute_validate(x.days_diff_now)/get_action_tag_count(user_df_service, x.time)))), axis=1)
tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg(
{'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"],
ascending=False)
tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum()
tag_score_sum["pay_type"] = tag_score_sum.apply(
lambda x: 3 if x.action == "api/order/validate" else (
2 if x.action == "api/settlement/alipay_callback" else 1
), axis=1
)
gmkv_tag_score_sum = tag_score_sum[["tag2", "tag_score", "weight"]][:size].to_dict('record')
# 写gmkv
gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=2, socket_timeout=2000)
cl_id_portrait_key = "user:service_portrait_tags:cl_id:" + str(cl_id)
tag_id_list_json = json.dumps(gmkv_tag_score_sum)
gm_kv_cli.set(cl_id_portrait_key, tag_id_list_json)
gm_kv_cli.expire(cl_id_portrait_key, time=30 * 24 * 60 * 60)
# 写tidb,redis同步
stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
replace_sql = """replace into user_service_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\
.format(stat_date=stat_date, cl_id=cl_id, tag_list=gmkv_tag_score_sum)
cur_jerry_test.execute(replace_sql)
db_jerry_test.commit()
# 写tidb 用户分层营销
score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]]
score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True)
delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id)
cur_jerry_test.execute(delete_sql)
db_jerry_test.commit()
for index, row in score_result.iterrows():
insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format(
row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type'])
cur_jerry_test.execute(insert_sql)
db_jerry_test.commit()
db_jerry_test.close()
return "sucess"
except Exception as e:
print(e)
if __name__ == '__main__':
try:
db_jerry_test = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
# 获取最近30天内的用户设备id
sql_device_ids = "select distinct cl_id from user_new_tag_log " \
"where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
cur_jerry_test.execute(sql_device_ids)
device_ids_lst = [i[0] for i in cur_jerry_test.fetchall()]
db_jerry_test.close()
# 获取搜索词及其近义词对应的tag
all_word_tags = get_all_word_tags()
all_tag_tag_type = get_all_tag_tag_type()
# 3级tag对应的2级tag
all_3tag_2tag = get_all_3tag_2tag()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment