Commit 8212b4a2 authored by 高雅喆's avatar 高雅喆

更新画像优化

parent 825fcac0
......@@ -45,42 +45,8 @@ def get_user_service_portrait(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
# # 用户的非搜索、支付、验证的行为
# user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \
# "where cl_id ='{}' and action not in " \
# "('api/order/validate','api/settlement/alipay_callback','do_search')".format(cl_id)
# cur_jerry_test.execute(user_df_service_sql)
# 用户的非搜索行为
user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \
"where cl_id ='{}' and action != 'do_search' ".format(cl_id)
cur_jerry_test.execute(user_df_service_sql)
data = list(cur_jerry_test.fetchall())
if data:
user_df_service = pd.DataFrame(data)
user_df_service.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]
else:
user_df_service = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"])
# 用户的搜索行为
user_df_search_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \
"where cl_id ='{}' and action = 'do_search'".format(cl_id)
cur_jerry_test.execute(user_df_search_sql)
data_search = list(cur_jerry_test.fetchall())
if data_search:
user_df_search = pd.DataFrame(data_search)
user_df_search.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]
else:
user_df_search = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"])
# 搜索词转成tag
# user_df_search_2_tag = pd.DataFrame(columns=list(user_df_service.columns))
for index, row in user_df_search.iterrows():
if row['tag_referrer'] in all_word_tags:
for search_tag in all_word_tags[row['tag_referrer']]:
row['tag_id'] = int(search_tag)
user_df_service = user_df_service.append(row, ignore_index=True)
break
user_df_service = get_user_log(cl_id, all_word_tags)
# 增加df字段(days_diff_now, tag_type, tag2)
if not user_df_service.empty:
......@@ -149,20 +115,20 @@ def get_user_service_portrait(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2
.format(stat_date=stat_date, cl_id=cl_id, tag_list=gmkv_tag_score_sum)
cur_jerry_test.execute(replace_sql)
db_jerry_test.commit()
# 写tidb 用户分层营销
# todo 不准确,因为聚合后,一个标签会有多个来源,即多个pay_type
score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]]
score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True)
delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id)
cur_jerry_test.execute(delete_sql)
db_jerry_test.commit()
for index, row in score_result.iterrows():
insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format(
row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type'])
cur_jerry_test.execute(insert_sql)
db_jerry_test.commit()
db_jerry_test.close()
# # 写tidb 用户分层营销
# # todo 不准确,因为聚合后,一个标签会有多个来源,即多个pay_type
# score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]]
# score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True)
# delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id)
# cur_jerry_test.execute(delete_sql)
# db_jerry_test.commit()
#
# for index, row in score_result.iterrows():
# insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format(
# row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type'])
# cur_jerry_test.execute(insert_sql)
# db_jerry_test.commit()
# db_jerry_test.close()
return "sucess"
except Exception as e:
print(e)
......
......@@ -268,3 +268,46 @@ def exponential_decay(days_diff, decay_days=30, normalization_size=7):
def args_test(x):
return "gyz add" + str(x)
def get_user_log(cl_id, all_word_tags, pay_time=0, debug=0):
user_df_service = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"])
try:
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
if pay_time == 0:
user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \
"where cl_id ='{cl_id}'".format(cl_id=cl_id)
else:
user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \
"where cl_id ='{cl_id}' and time < {pay_time}".format(cl_id=cl_id, pay_time=pay_time)
cur_jerry_test.execute(user_df_service_sql)
data = list(cur_jerry_test.fetchall())
if data:
user_df_service = pd.DataFrame(data)
user_df_service.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]
else:
return user_df_service
# 用户的搜索行为:
user_df_search = user_df_service[user_df_service["action"] == "do_search"]
if debug:
# 用户的非搜索、支付行为
user_df_service = user_df_service.loc[
~user_df_service["action"].isin(["do_search", "api/settlement/alipay_callback"])]
else:
# 用户的非搜索行为
user_df_service = user_df_service.loc[~user_df_service["action"].isin(["do_search"])]
# 搜索词转成tag,合并用户日志
user_df_search_dict = dict()
for index, row in user_df_search.iterrows():
if row['tag_referrer'] in all_word_tags:
word_tag_list = all_word_tags[row['tag_referrer']]
row['tag_id'] = int(word_tag_list[0]) if word_tag_list else -1
else:
row['tag_id'] = -1
user_df_service = user_df_service.append(user_df_search)
return user_df_service[user_df_service["tag_id"] != -1]
except:
print("error2_user_portrait", traceback.format_exc())
return user_df_service
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment