# -*- coding: UTF-8 -*- import argparse import pymysql import redis import datetime import time import json import numpy as np import pandas as pd from tool import * import logging from collections import defaultdict def get_count(actions): counts = defaultdict(int) for x in actions: counts[x] += 1 return counts def setup_logger(logger_name, log_file, level=logging.INFO): my_log = logging.getLogger(logger_name) formatter = logging.Formatter('%(message)s') file_handler = logging.FileHandler(log_file, mode='a') file_handler.setFormatter(formatter) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) my_log.setLevel(level) my_log.addHandler(file_handler) my_log.addHandler(stream_handler) def get_user_service_portrait_not_alipay(cl_id, all_word_tags, all_tag_tag_type, pay_time, all_3tag_2tag, version=1, exponential=1, normalization_size=7, decay_days=30, size=10): """ :param cl_id: :param all_word_tags: :param all_tag_tag_type: :param pay_time 用户下订单的timestamp :param all_3tag_2tag: :param version: 0:翔宇版; 1:英赫版 :param size: :return: 画像(去掉支付行为) """ try: db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset='utf8') cur_jerry_test = db_jerry_test.cursor() # 用户的非搜索、支付的行为 user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \ "where cl_id ='{cl_id}' and time < {pay_time} and action not in " \ "('api/settlement/alipay_callback','do_search')".format(cl_id=cl_id, pay_time=pay_time) cur_jerry_test.execute(user_df_service_sql) data = list(cur_jerry_test.fetchall()) if data: user_df_service = pd.DataFrame(data) user_df_service.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"] else: user_df_service = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]) # 用户的搜索行为 user_df_search_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \ "where cl_id ='{cl_id}' and time < {pay_time} and " \ "action = 'do_search'".format(cl_id=cl_id, pay_time=pay_time) cur_jerry_test.execute(user_df_search_sql) data_search = list(cur_jerry_test.fetchall()) db_jerry_test.close() if data_search: user_df_search = pd.DataFrame(data_search) user_df_search.columns = ["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"] else: user_df_search = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]) # 搜索词转成tag # user_df_search_2_tag = pd.DataFrame(columns=list(user_df_service.columns)) for index, row in user_df_search.iterrows(): if row['tag_referrer'] in all_word_tags: for search_tag in all_word_tags[row['tag_referrer']]: row['tag_id'] = int(search_tag) user_df_service = user_df_service.append(row, ignore_index=True) break # 增加df字段(days_diff_now, tag_type, tag2) if not user_df_service.empty: user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60)) user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1) user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])] user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist() user_df_service["tag2"] = user_df_service.apply(lambda x: get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list) if x.tag_type == '3' else x.tag_id, axis=1) user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1) # 算分及比例 if version == 1: user_df_service["tag_score"] = user_df_service.apply( lambda x: compute_henqiang(x.days_diff_now, decay_days, normalization_size, exponential)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else ( compute_jiaoqiang(x.days_diff_now, decay_days, normalization_size, exponential)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else ( compute_ai_scan(x.days_diff_now, decay_days, normalization_size, exponential)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else ( compute_ruoyixiang(x.days_diff_now, decay_days, normalization_size, exponential)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now, decay_days, normalization_size, exponential)/get_action_tag_count(user_df_service, x.time)))), axis=1) finally_score = user_df_service.groupby(by=["tag2", "tag2_type"]).agg( {'tag_score': 'sum', 'cl_id': 'first', 'action': get_count}).reset_index().sort_values(by=["tag_score"], ascending=False) finally_score['weight'] = 100 * finally_score['tag_score'] / finally_score['tag_score'].sum() finally_score["pay_type"] = finally_score.apply( lambda x: 3 if x.action == "api/order/validate" else ( 2 if x.action == "api/settlement/alipay_callback" else 1 ), axis=1 ) gmkv_tag_score_sum_list = finally_score["tag2"].to_list()[:size] # 获取tag的得分来源(action信息) debug_tag_score_sum = finally_score[["tag2", "tag_score", "action"]][:size].to_dict('record') debug_tag_score_sum_dict = {info["tag2"]: info for info in debug_tag_score_sum} elif version == 0: user_df_service["tag_score"] = user_df_service.apply( lambda x: compute_henqiang(x.days_diff_now, decay_days, normalization_size, exponential) if x.score_type == "henqiang" else ( compute_jiaoqiang(x.days_diff_now, decay_days, normalization_size, exponential) if x.score_type == "jiaoqiang" else ( compute_ai_scan(x.days_diff_now, decay_days, normalization_size, exponential) if x.score_type == "ai_scan" else ( compute_ruoyixiang(x.days_diff_now, decay_days, normalization_size, exponential) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now, decay_days, normalization_size, exponential)))), axis=1) finally_score = user_df_service.sort_values(by=["tag_score", "time"], ascending=False) finally_score.drop_duplicates(subset="tag2", inplace=True, keep="first") finally_score["weight"] = 100 * finally_score['tag_score'] / finally_score['tag_score'].sum() gmkv_tag_score_sum_list = finally_score["tag2"].to_list()[:size] # 获取tag的得分来源(action信息) debug_tag_score_sum = finally_score[["tag2", "tag_score", "action", "time"]][:size].to_dict('record') debug_tag_score_sum_dict = {info["tag2"]: str(datetime.datetime.fromtimestamp(int(info["time"]))) for info in debug_tag_score_sum} # 没有用户的画像 else: gmkv_tag_score_sum_list = list() debug_tag_score_sum_dict = dict() return gmkv_tag_score_sum_list, debug_tag_score_sum_dict except Exception as e: print(e) return list(), dict() def get_2_tags_coincide_rate(device_order_tags, device_portrait_result, portrait_top_n, coincide_n): """ :param device_order_tags: :param device_portrait_result: :param portrait_top_n: :param coincide_n: :return: 匹配度:比对的上的用户数/昨天下单了的且有画像的用户数 * 100% 比对的上:去掉下单和验证行为的画像的前portrait_top_n个tag 与 用户下单的美购的tag 有coincide_n个重合个数 """ device_count = len(device_order_tags) # 总的下单设备数 coincide_count = 0 # 比对的上的设备数 not_coincide_no_portrait = 0 # 比对不上的且没有画像的设备数 not_coincide_no_portrait_device_ids = [] # 比对不上的且没有画像的设备 not_coincide_have_portrait_device_ids = [] # 比对不上的且有画像的设备数 not_coincide_have_portrait = 0 # 比对不上的且有画像的设备 for device in device_order_tags: order_tags = device_order_tags[device] portrait_tags = device_portrait_result[device] if portrait_tags: portrait_tags = portrait_tags[:portrait_top_n] else: # 没有画像的设备 not_coincide_no_portrait += 1 not_coincide_no_portrait_device_ids.append(device) continue # 有画像且匹配的上 if len(set(order_tags).intersection(set(portrait_tags))) >= coincide_n: coincide_count += 1 # 有画像且匹配不上 else: not_coincide_have_portrait += 1 not_coincide_have_portrait_device_ids.append(device) coincide_rate = coincide_count/(device_count-not_coincide_no_portrait) result = {"device_count": device_count, "coincide_count": coincide_count, "coincide_rate": coincide_rate, "not_coincide_have_portrait_count": not_coincide_have_portrait, "not_coincide_no_portrait_count": not_coincide_no_portrait, "not_coincide_no_portrait_device_ids": not_coincide_no_portrait_device_ids, "not_coincide_have_portrait_device_ids": not_coincide_have_portrait_device_ids} return result def get_user_order_info_yesterday(order_date, order_date_tomorrow): # 获取昨天下单的用户设备id,下单的美购,美购对应的tag # api_order只有用户的user_id,一个user_id对应多个device_id # 用户一次可以下多个订单(美购),一个美购对应多个tag sql_device_info_yesterday = """ SELECT tmp1.user_id, c.device_id, tmp1.service_ids, tmp1.tag_ids, tmp1.pay_time FROM (SELECT tmp.user_id, tmp.service_ids, tmp.tag_ids, tmp.pay_time, max(tmp.device_id) device_id_id FROM (SELECT a.user_id, a.service_ids, a.tag_ids, a.pay_time, b.device_id FROM (SELECT user_id, max(pay_time) AS pay_time, group_concat(DISTINCT `service_id` separator ',') service_ids, group_concat(DISTINCT `tag_id` separator ',') tag_ids FROM (SELECT d.user_id, d.service_id, unix_timestamp(d.pay_time) AS pay_time, e.tag_id FROM api_order d LEFT JOIN api_servicetag e ON d.service_id = e.service_id LEFT JOIN api_tag f ON e.tag_id = f.id WHERE d.status=1 AND d.pay_time>'{order_date}' AND d.pay_time<'{order_date_tomorrow}' AND f.tag_type+0 <'4'+0) tmp2 GROUP BY user_id) a LEFT JOIN statistic_device_user b ON a.user_id = b.user_id) tmp GROUP BY tmp.user_id) tmp1 LEFT JOIN statistic_device c ON tmp1.device_id_id = c.id WHERE c.device_id IS NOT NULL """.format(order_date=order_date, order_date_tomorrow=order_date_tomorrow) mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql_device_info_yesterday) device_ids_info = [(i["device_id"], int(i["pay_time"])) for i in mysql_results] all_device_action_tags = {i["device_id"]: [int(tag) for tag in i["tag_ids"].split(",")] for i in mysql_results} return device_ids_info, all_device_action_tags def get_user_diary_click_info_yesterday(click_date, click_date_tomorrow): # 获取昨天在首页精选点击日记的用户设备id,点击的日记,日记对应的tag # 一个用户对应多个日记,一个日记对应多个tag sql_device_info_yesterday = """ SELECT cl_id device_id, max(click_time) AS click_time, group_concat(DISTINCT `diary_id` separator ',') diary_ids, group_concat(DISTINCT `tag_id` separator ',') tag_ids FROM (SELECT d.cl_id, d.diary_id, unix_timestamp(d.click_time) AS click_time, e.tag_id FROM jerry_test.user_click_diary_log d LEFT JOIN eagle.src_mimas_prod_api_diary_tags e ON d.diary_id = e.diary_id LEFT JOIN eagle.src_zhengxing_api_tag f ON e.tag_id = f.id WHERE d.action_from='home精选' AND d.action='on_click_card' AND d.click_time>'{click_date}' AND d.click_time<'{click_date_tomorrow}' AND f.tag_type+0 <'4'+0 AND SUBSTR(MD5(d.cl_id),-1,1) IN ('0','1','2','3','4','a','b','c','e')) tmp2 GROUP BY cl_id """.format(click_date=click_date, click_date_tomorrow=click_date_tomorrow) mysql_results = get_data_by_mysql('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC', 'jerry_test', sql_device_info_yesterday) device_ids_info = [(i["device_id"], int(i["click_time"])) for i in mysql_results] all_device_action_tags = {i["device_id"]: [int(tag) for tag in i["tag_ids"].split(",")] for i in mysql_results} return device_ids_info, all_device_action_tags def get_user_service_click_info_yesterday(click_date, click_date_tomorrow): # 获取昨天在美购首页有过点击的用户设备id,点击的美购,美购对应的tag # 一个用户对应多个美购,一个美购对应多个tag sql_device_info_yesterday = """ SELECT cl_id device_id, max(click_time) AS click_time, group_concat(DISTINCT `service_id` separator ',') service_ids, group_concat(DISTINCT `tag_id` separator ',') tag_ids FROM (SELECT d.cl_id, d.service_id, unix_timestamp(d.click_time) AS click_time, e.tag_id FROM jerry_test.user_click_service_log d LEFT JOIN eagle.src_zhengxing_api_servicetag e ON d.service_id = e.service_id LEFT JOIN eagle.src_zhengxing_api_tag f ON e.tag_id = f.id WHERE d.action_from='welfare_home_list_item' AND d.action='goto_welfare_detail' AND d.click_time>'{click_date}' AND d.click_time<'{click_date_tomorrow}' AND f.tag_type+0 <'4'+0 AND SUBSTR(MD5(d.cl_id),-1,1) IN ('0','1','2','3','4','a','b','c','e')) tmp2 GROUP BY cl_id """.format(click_date=click_date, click_date_tomorrow=click_date_tomorrow) mysql_results = get_data_by_mysql('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC', 'jerry_test', sql_device_info_yesterday) device_ids_info = [(i["device_id"], int(i["click_time"])) for i in mysql_results] all_device_action_tags = {i["device_id"]: [int(tag) for tag in i["tag_ids"].split(",")] for i in mysql_results} return device_ids_info, all_device_action_tags if __name__ == '__main__': try: parser = argparse.ArgumentParser(description='画像匹配度的统计') my_yesterday = str(datetime.date.today() - datetime.timedelta(days=1)) parser.add_argument("-o", "--order_date", type=str, dest="order_date", default=my_yesterday, help="统计的行为日期") parser.add_argument("-log1", "--log1_file", type=str, dest="portrait_stat_log_path", default="portrait_stat.log", help="画像统计的日志地址") parser.add_argument("-log2", "--log2_file", type=str, dest="debug_portrait_stat_log_path", default="debug_portrait_stat.log", help="画像统计的日志地址") parser.add_argument("-t", "--top", type=int, dest="portrait_top_n", default=3, help="选取画像的前n个tag去统计匹配度") parser.add_argument("-c", "--coincide", type=int, dest="coincide_n", default=1, help="选取n个tag重合个数作为判断是否匹配的阈值") parser.add_argument("-v", "--version", type=int, dest="version", default=1, help="选取翔宇(0),英赫(1)版本进行统计") parser.add_argument("-e", "--exponential", type=int, dest="exponential", default=0, help="是否采用指数衰减") parser.add_argument("-n", "--normalization_size", type=int, dest="normalization_size", default=7, help="天数差归一化的区间") parser.add_argument("-d", "--decay_days", type=int, dest="decay_days", default=30, help="分数衰减的天数") parser.add_argument("-a", "--action_type", dest="action_type", nargs='+', help="计算匹配度的行为") parser.add_argument("-s", "--save_tidb", type=int, dest="save_tidb", default=1, help="统计结果是否存tidb") args = parser.parse_args() order_date = args.order_date order_date_tomorrow = str(datetime.datetime.strptime(order_date, '%Y-%m-%d') + datetime.timedelta(days=1)) portrait_stat_log_path = args.portrait_stat_log_path debug_portrait_stat_log_path = args.debug_portrait_stat_log_path cmd_portrait_top_n = args.portrait_top_n cmd_coincide_n = args.coincide_n version = args.version exponential = args.exponential normalization_size = args.normalization_size decay_days = args.decay_days action_type = args.action_type save_tidb = args.save_tidb LOG_DIR = "/home/gmuser/gyz/log/" my_today = str(datetime.date.today()) setup_logger("log1", LOG_DIR + portrait_stat_log_path) setup_logger("log2", LOG_DIR + debug_portrait_stat_log_path) log1 = logging.getLogger('log1') log2 = logging.getLogger('log2') # 获取搜索词及其近义词对应的tag all_word_tags = get_all_word_tags() all_tag_tag_type = get_all_tag_tag_type() # 3级tag对应的2级tag all_3tag_2tag = get_all_3tag_2tag() for action in action_type: # 获取昨天产生行为的设备id、以及行为对应的tag action_type_detail = "" device_ids_lst = list() all_device_action_tags = dict() if action == "order": device_ids_lst, all_device_action_tags = get_user_order_info_yesterday(order_date, order_date_tomorrow) action_type_detail = "昨天下单了的用户" elif action == "diary": device_ids_lst, all_device_action_tags = get_user_diary_click_info_yesterday(order_date, order_date_tomorrow) action_type_detail = "昨天在首页精选点击了日记的用户" elif action == "service": device_ids_lst, all_device_action_tags = get_user_service_click_info_yesterday(order_date, order_date_tomorrow) action_type_detail = "昨天在美购首页点击了美购的用户" else: break # tags扩展2级tags all_device_action_tags2 = dict() for device in all_device_action_tags: tags = all_device_action_tags[device] for tag in tags: tags2 = all_3tag_2tag.get(tag, []) tags += tags2 all_device_action_tags2[device] = tags # 用户的去除支付行为的画像 all_device_portrait_result = dict() debug_all_device_portrait_result = dict() for order_info in device_ids_lst: device = order_info[0] pay_time = order_info[1] portrait_result, debug_portrait_result = get_user_service_portrait_not_alipay(device, all_word_tags, all_tag_tag_type, pay_time, all_3tag_2tag, version=version, exponential=exponential, normalization_size=normalization_size, decay_days=decay_days, size=-1) all_device_portrait_result[device] = portrait_result debug_all_device_portrait_result[device] = debug_portrait_result # 比较两个tag列表的重合率 result = get_2_tags_coincide_rate(all_device_action_tags2, all_device_portrait_result, cmd_portrait_top_n, cmd_coincide_n) # 有画像没匹配上的用户的画像信息 no_coincide_devices = result["not_coincide_have_portrait_device_ids"] no_coincide_devices_debug = dict() log2.info({"统计日期": my_today}) log2.info({"参数信息": args}) log2.info({"版本": "英赫版" if version == 1 else "翔宇版"}) log2.info({"统计用户": action_type_detail}) for device in no_coincide_devices: no_coincide_devices_debug = dict() device_portrait_n = all_device_portrait_result[device][:args.portrait_top_n] device_order_tags = all_device_action_tags2[device] debug_device_portrait_result = debug_all_device_portrait_result[device] no_coincide_devices_debug[device] = { "画像的前{top_n}个tag".format(top_n=args.portrait_top_n): [debug_device_portrait_result[tag] for tag in device_portrait_n], "用户行为对应的tag": [debug_device_portrait_result.get(tag, dict()) for tag in device_order_tags] } log2.info("-" * 66) log2.info(no_coincide_devices_debug) log2.info("\n"*6) # 统计画像更新的耗时和更新的设备数 sql = "select count(*) from user_service_portrait_tags where stat_date='{my_today}'".format(my_today=my_today) portrait_device_count = get_data_by_mysql('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC', 'jerry_test', sql) with open(LOG_DIR + "dist_portrait.log", 'r') as f: lines = f.readlines() start_datetime_str = lines[0][:19] end_datetime_str = lines[-1][:19] start_datetime = datetime.datetime.strptime(start_datetime_str, '%Y-%m-%d %H:%M:%S') end_datetime = datetime.datetime.strptime(end_datetime_str, '%Y-%m-%d %H:%M:%S') time_consuming = (end_datetime - start_datetime).seconds / 60 log1.info({"画像信息统计日期": my_today}) log1.info({"参数信息": args}) log1.info({"版本": "英赫版" if version == 1 else "翔宇版"}) log1.info({"统计用户": action_type_detail}) log1.info({"画像更新耗时(分钟)": time_consuming}) log1.info({"画像更新的设备数": portrait_device_count[0]["count(*)"]}) log1.info("") log1.info({"统计画像匹配度所用数据的日期": order_date}) log1.info({"统计画像的选取前n个tag": cmd_portrait_top_n}) log1.info({"重合个数": cmd_coincide_n}) log1.info({"有行为的人数": result["device_count"]}) log1.info({"比对的上的人数": result["coincide_count"]}) log1.info({"匹配度": result["coincide_rate"]}) log1.info({"比对不上的有画像的人数": result["not_coincide_have_portrait_count"]}) log1.info({"比对不上的无画像的人数": result["not_coincide_no_portrait_count"]}) log1.info("="*66) # 统计数据进tidb if save_tidb: tplt = "insert into user_portrait_tags_evalution values(null,'{0}','{1}','{2}',{3},{4},{5},{6},{7})" stat_date = order_date.replace("-", "") insert_sql = tplt.format(stat_date, "英赫版" if version == 1 else "翔宇版", action_type_detail, result["device_count"], result["coincide_count"], result["coincide_rate"], result["not_coincide_have_portrait_count"], result["not_coincide_no_portrait_count"]) write_data_by_mysql('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC', 'jerry_test', insert_sql) except Exception as e: print(e)