import pymysql import redis import datetime import time import json import numpy as np import pandas as pd from eda.smart_rank.dist_update_user_portrait_service import * def get_user_service_portrait_not_alipay(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2tag, size=10): try: db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset='utf8') cur_jerry_test = db_jerry_test.cursor() # 用户的非搜索、支付的行为 user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \ "where cl_id ='{}' and action not in " \ "('api/settlement/alipay_callback','do_search')".format(cl_id) cur_jerry_test.execute(user_df_service_sql) data = list(cur_jerry_test.fetchall()) if data: user_df_service = pd.DataFrame(data) user_df_service.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"] else: user_df_service = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]) # 用户的搜索行为 user_df_search_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \ "where cl_id ='{}' and action = 'do_search'".format(cl_id) cur_jerry_test.execute(user_df_search_sql) data_search = list(cur_jerry_test.fetchall()) db_jerry_test.close() if data_search: user_df_search = pd.DataFrame(data_search) user_df_search.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"] else: user_df_search = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]) # 搜索词转成tag # user_df_search_2_tag = pd.DataFrame(columns=list(user_df_service.columns)) for index, row in user_df_search.iterrows(): if row['tag_referrer'] in all_word_tags: for search_tag in all_word_tags[row['tag_referrer']]: row['tag_id'] = int(search_tag) user_df_service = user_df_service.append(row, ignore_index=True) break # 增加df字段(days_diff_now, tag_type, tag2) if not user_df_service.empty: user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60)) user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1) user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])] user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist() user_df_service["tag2"] = user_df_service.apply(lambda x: get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list) if x.tag_type == '3' else x.tag_id, axis=1) user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1) # 算分及比例 user_df_service["tag_score"] = user_df_service.apply( lambda x: compute_henqiang(x.days_diff_now)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else ( compute_jiaoqiang(x.days_diff_now)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else ( compute_ai_scan(x.days_diff_now)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else ( compute_ruoyixiang(x.days_diff_now)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now)/get_action_tag_count(user_df_service, x.time)))), axis=1) tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg( {'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"], ascending=False) tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum() tag_score_sum["pay_type"] = tag_score_sum.apply( lambda x: 3 if x.action == "api/order/validate" else ( 2 if x.action == "api/settlement/alipay_callback" else 1 ), axis=1 ) gmkv_tag_score_sum = tag_score_sum[["tag2", "tag_score", "weight"]][:size].to_dict('record') # 写gmkv gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=2, socket_timeout=2000) cl_id_portrait_key = "user:service_portrait_tags:cl_id:" + str(cl_id) tag_id_list_json = json.dumps(gmkv_tag_score_sum) gm_kv_cli.set(cl_id_portrait_key, tag_id_list_json) gm_kv_cli.expire(cl_id_portrait_key, time=30 * 24 * 60 * 60) # 写tidb,redis同步 stat_date = datetime.datetime.today().strftime('%Y-%m-%d') replace_sql = """replace into user_service_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\ .format(stat_date=stat_date, cl_id=cl_id, tag_list=gmkv_tag_score_sum) cur_jerry_test.execute(replace_sql) db_jerry_test.commit() # 写tidb 用户分层营销 score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]] score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True) delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id) cur_jerry_test.execute(delete_sql) db_jerry_test.commit() for index, row in score_result.iterrows(): insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format( row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type']) cur_jerry_test.execute(insert_sql) db_jerry_test.commit() db_jerry_test.close() return "sucess" except Exception as e: print(e) if __name__ == '__main__': try: db_jerry_test = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset='utf8') cur_jerry_test = db_jerry_test.cursor() # 获取最近30天内的用户设备id sql_device_ids = "select distinct cl_id from user_new_tag_log " \ "where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))" cur_jerry_test.execute(sql_device_ids) device_ids_lst = [i[0] for i in cur_jerry_test.fetchall()] db_jerry_test.close() # 获取搜索词及其近义词对应的tag all_word_tags = get_all_word_tags() all_tag_tag_type = get_all_tag_tag_type() # 3级tag对应的2级tag all_3tag_2tag = get_all_3tag_2tag()