# -*- coding: UTF-8 -*- # !/usr/bin/env python from kafka import KafkaConsumer import random from libs.cache import redis_client import logging from linucb.views.linucb import LinUCB import json from trans2es.models.tag import TopicTag,Tag from trans2es.models.topic import TopicHomeRecommend import traceback from django.conf import settings from libs.es import ESPerform from search.utils.common import * import libs.tools as Tools from trans2es.models.pictorial import CommunityPictorialHomeFeed from libs.error import logging_exception import os from search.views.tag import get_same_tagset_ids import msgpack def loads_data(data): try: result = json.loads(data) msg = True return result,msg except: result = msgpack.loads(data) msg = False return result,msg class KafkaManager(object): consumser_obj = None @classmethod def get_kafka_consumer_ins(cls, topic_name=None): if not cls.consumser_obj: topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name gm_logging_name = settings.KAFKA_GM_LOGGING_TOPIC_NAME cls.consumser_obj = KafkaConsumer(bootstrap_servers=settings.KAFKA_BROKER_LIST) cls.consumser_obj.subscribe([topic_name, gm_logging_name]) return cls.consumser_obj class CollectData(object): def __init__(self): self.linucb_matrix_redis_prefix = "physical:linucb:device_id:" #废弃 self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:" #推荐帖子 self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:" # 推荐榜单 self.linucb_recommend_pictorial_id_prefix = "physical:linucb:pictorial_recommend:device_id:" self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:" self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:" # 默认 self.user_feature = [0,1] def _get_user_linucb_info(self, device_id): try: redis_key = self.linucb_matrix_redis_prefix + str(device_id) # dict的key为标签ID,value为4个矩阵 redis_linucb_tag_data_dict = redis_client.hgetall(redis_key) return redis_linucb_tag_data_dict except: logging_exception() logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return dict() def update_recommend_tag_list(self, device_id,user_feature=None,user_id=None,click_topic_tag_list=None,new_user_click_tag_list = []): try: redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id) if len(redis_linucb_tag_data_dict) == 0: recommend_tag_list = list(LinUCB.get_default_tag_list(user_id)) LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list) else: user_feature = user_feature if user_feature else self.user_feature (recommend_tag_dict,recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys())) recommend_tag_list = list(recommend_tag_dict.keys()) if len(recommend_tag_list) > 0: tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id) redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list)) redis_client.expire(tag_recommend_redis_key, 7*24*60*60) have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND) have_read_lin_pictorial_id_list = Tools.get_have_read_lin_pictorial_id_list(device_id, user_id, TopicPageType.HOME_RECOMMEND) promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE1_DB_NAME).filter(is_online=1).values_list("topic_id",flat=True) promote_lin_pictorial_id_list = CommunityPictorialHomeFeed.objects.using(settings.SLAVE1_DB_NAME).filter( is_deleted=0, is_online=1).values_list("pictorial_id", flat=True) have_read_topic_id_list.extend(promote_recommend_topic_id_list) have_read_lin_pictorial_id_list.extend(promote_lin_pictorial_id_list) recommend_topic_id_list = list() recommend_topic_id_list_dict = dict() recommend_topic_id_list_click = list() recommend_topic_id_list_click_dict = dict() recommend_lin_pictorial_id_list = list() if click_topic_tag_list and len(click_topic_tag_list)>0: click_topic_tag_list_same_tagset_ids = get_same_tagset_ids(click_topic_tag_list) recommend_topic_id_list_click,recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(click_topic_tag_list_same_tagset_ids, have_read_topic_id_list,size=2) if len(recommend_topic_id_list_click) > 0: recommend_topic_id_list.extend(recommend_topic_id_list_click) recommend_topic_id_list_dict.update(recommend_topic_id_list_click_dict) # have_read_topic_id_list.extend(recommend_topic_id_list_click) # click_recommend_redis_key = self.click_recommend_redis_key_prefix + str(device_id) # click_redis_data_dict = { # "data": json.dumps(recommend_topic_id_list), # "datadict":json.dumps(recommend_topic_id_list_dict), # "cursor": 0 # } # redis_client.hmset(click_recommend_redis_key, click_redis_data_dict) tag_id_list = recommend_tag_list[0:20] pictorial_recommend_redis_key = self.linucb_recommend_pictorial_id_prefix + str(device_id) topic_recommend_redis_key = self.linucb_recommend_topic_id_prefix + str(device_id) # redis_topic_data_dict = redis_client.hgetall(topic_recommend_redis_key) # redis_topic_list = list() # cursor = -1 # if b"data" in redis_topic_data_dict: # redis_topic_list = json.loads(redis_topic_data_dict[b"data"]) if redis_topic_data_dict[ # b"data"] else [] # cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8")) # if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0: # have_read_topic_id_list.extend(redis_topic_list[:2]) if len(new_user_click_tag_list)>0: new_user_click_tag_list_same_tagset_ids = get_same_tagset_ids(new_user_click_tag_list) tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(new_user_click_tag_list_same_tagset_ids, have_read_topic_id_list) recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(new_user_click_tag_list_same_tagset_ids, have_read_lin_pictorial_id_list) else: tag_id_list_same_tagset_ids = get_same_tagset_ids(tag_id_list) tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list_same_tagset_ids,have_read_topic_id_list) recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(tag_id_list_same_tagset_ids, have_read_lin_pictorial_id_list) if len(recommend_topic_id_list)>0 or len(tag_topic_id_list)>0 or len(new_user_click_tag_list) > 0: tag_topic_id_list = recommend_topic_id_list + tag_topic_id_list tag_topic_dict.update(recommend_topic_id_list_dict) redis_data_dict = { "data": json.dumps(tag_topic_id_list), "datadict":json.dumps(tag_topic_dict), "cursor":0 } redis_client.hmset(topic_recommend_redis_key,redis_data_dict) if len(recommend_lin_pictorial_id_list) > 0: pictorial_data_dict = { "data": json.dumps(recommend_lin_pictorial_id_list), "cursor": 0 } redis_client.hmset(pictorial_recommend_redis_key, pictorial_data_dict) return True except: logging_exception() logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None): try: user_feature = user_feature if user_feature else self.user_feature return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client) except: logging_exception() logging.error("update_user_linucb_tag_info error!") return False def consume_data_from_kafka(self,topic_name=None): try: user_feature = [1,1] kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name) while True: msg_dict = kafka_consumer_obj.poll(timeout_ms=100) for msg_key in msg_dict: consume_msg = msg_dict[msg_key] for ori_msg in consume_msg: try: raw_val_dict,msg = loads_data(ori_msg.value) if msg: logging.info(ori_msg.value) if "type" in raw_val_dict and \ (raw_val_dict["type"] in ("on_click_feed_topic_card","on_click_button")): click_topic_tag_list = list() if "on_click_feed_topic_card" == raw_val_dict["type"]: topic_id = raw_val_dict["params"]["topic_id"] device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id))) # topic_tag_list = list(TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=topic_id,is_online=True).values_list("tag_id",flat=True)) # tag_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(id__in=topic_tag_list,is_online=True,is_deleted=False).values_list("id","collection","is_ai") # for id,collection,is_ai in tag_query_results: # if collection and is_ai: # click_topic_tag_list.append(id) topic_tag_list = list() click_results = TopicTag.objects.using(settings.SLAVE1_DB_NAME).filter( topic_id=topic_id, is_online=True).values_list("tag_id", "is_collection") for tag_id, is_collection in click_results: # topic_tag_list.append(tag_id) if is_collection: topic_tag_list.append(tag_id) tag_query_results = Tag.objects.using(settings.SLAVE1_DB_NAME).filter( id__in=topic_tag_list, is_online=True, is_deleted=False, is_category=False).values_list("id", "is_ai") for id, is_ai in tag_query_results: click_topic_tag_list.append(id) logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % ( str(device_id), str(topic_id), str(click_topic_tag_list))) elif raw_val_dict["type"] == "on_click_button" and "page_name" in \ raw_val_dict["params"] and "button_name" in raw_val_dict["params"] \ and "extra_param" in raw_val_dict["params"]: if raw_val_dict["params"]["page_name"] == "search_detail" and \ raw_val_dict["params"]["button_name"] == "focus_tag": tag_name = raw_val_dict["params"]["extra_param"] device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(name=tag_name,is_online=True,is_deleted=False, is_category=False).values_list("id",flat=True)) click_topic_tag_list.extend(tag_list) logging.info("query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % ( str(device_id), tag_name, str(click_topic_tag_list))) logging.info("click_topic_tag_list:%s"%(str(click_topic_tag_list))) is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 for tag_id in click_topic_tag_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 if len(click_topic_tag_list)>0: self.update_recommend_tag_list(device_id, user_feature, user_id,click_topic_tag_list=click_topic_tag_list) # elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]: # if isinstance(raw_val_dict["params"]["exposure_cards"],str): # exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"]) # elif isinstance(raw_val_dict["params"]["exposure_cards"],list): # exposure_cards_list = raw_val_dict["params"]["exposure_cards"] # else: # exposure_cards_list = list() # device_id = raw_val_dict["device"]["device_id"] # user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None # logging.warning("type msg:%s" % raw_val_dict.get("type")) # exposure_topic_id_list = list() # for item in exposure_cards_list: # if "card_id" not in item: # continue # exposure_topic_id = item["card_id"] # logging.info( # "consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id))) # if exposure_topic_id: # exposure_topic_id_list.append(exposure_topic_id) # # topic_tag_id_dict = dict() # tag_list = list() # exposure_sql_query_results = TopicTag.objects.using(settings.SLAVE_DB_NAME).\ # filter(topic_id__in=exposure_topic_id_list).\ # values_list("topic_id","tag_id","is_online","is_collection") # # if len(exposure_sql_query_results)>0: # for topic_id,tag_id,is_online,is_collection in exposure_sql_query_results: # if is_online and is_collection == 1: # tag_list.append(tag_id) # if is_online: # tag_sql_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter( # id=tag_id).values_list("id", "collection", "is_ai") # for id, collection, is_ai in tag_sql_query_results: # if (is_ai == 1) and id not in tag_list: # tag_list.append(id) # # if topic_id not in topic_tag_id_dict: # topic_tag_id_dict[topic_id] = list() # topic_tag_id_dict[topic_id].append(tag_id) # # is_click = 0 # is_vote = 0 # # reward = 1 if is_click or is_vote else 0 # # logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % ( # str(device_id), str(topic_tag_id_dict))) # for tag_id in tag_list: # self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # # # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 # self.update_recommend_tag_list(device_id, user_feature, user_id) elif "type" in raw_val_dict and "interest_choice_click_next" == raw_val_dict["type"]: if isinstance(raw_val_dict["params"]["tagid_list"],str): tagid_list = json.loads(raw_val_dict["params"]["tagid_list"]) elif isinstance(raw_val_dict["params"]["tagid_list"],list): tagid_list = raw_val_dict["params"]["tagid_list"] else: tagid_list = list() logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None # if len(exposure_sql_query_results)>0: if len(tagid_list) > 0: tag_query_results = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter( id__in=tagid_list, is_online=True, is_deleted=False, is_category=False).values_list("id",flat =True)) is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 for tag_id in tag_query_results: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature, user_id, new_user_click_tag_list=tag_query_results) else: logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) # 用户点击个性化push进linucb elif "type" in raw_val_dict and raw_val_dict["type"] == "on_click_push": # 后端已过滤,该tag_ids是帖子/榜单的编辑标签 if "tag_ids" in raw_val_dict["params"]: tag_ids = raw_val_dict["params"]["tag_ids"] else: # todo 客户端埋点bug,后期移除 tag_ids = raw_val_dict["params"].get("tag_ids:", []) if isinstance(tag_ids, str): tagid_list = json.loads(tag_ids) elif isinstance(tag_ids, list): tagid_list = tag_ids else: tagid_list = list() device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None if len(tagid_list) > 0: tag_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter( id__in=tagid_list, is_online=True, is_deleted=False, is_category=False).values_list("id", flat=True) is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 for tag_id in tag_query_results: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) self.update_recommend_tag_list(device_id, user_feature, user_id, new_user_click_tag_list=tag_query_results) logging.info("on_click_push topic type:%s, device_id:%s, tag_ids:%s" % (raw_val_dict.get("type", "missing type"), str(device_id), str(tagid_list))) # 用户点击问题清单进linucb elif b'content' in raw_val_dict: data = json.loads(raw_val_dict[b'content']) if 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS']['action'] == "venus/community/skin_check/submit_questions": device_id = data['SYS']['cl_id'] tagid_list = list(data['APP'].get('answer_tag', [])) user_id = data['SYS'].get('user_id', None) logging.info("skin_check topic type:%s, device_id:%s, answer_tag:%s" % (str(data['SYS']['action']), str(device_id), str(tagid_list))) if len(tagid_list) > 0: tag_query_results = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter( id__in=tagid_list, is_online=True, is_deleted=False, is_category=False).values_list("id", flat=True)) tag_query_results_multi = [i for i in tagid_list if i in tag_query_results] is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 for i in range(5): for tag_id in tag_query_results_multi: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature, user_id, new_user_click_tag_list=tag_query_results) logging.info("skin_check topic type:%s, device_id:%s, tag_query_results:%s" % (str(data['SYS']['action']), str(device_id), str(tag_query_results))) else: if msg: logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) except: logging_exception() logging.error("catch exception,err_msg:%s" % traceback.format_exc()) # 假设数据库连接异常,强制退出程序,supervisor重启linub os._exit(0) return True except: logging_exception() logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False