# -*- coding: UTF-8 -*- # !/usr/bin/env python from kafka import KafkaConsumer import random from libs.cache import redis_client import logging from linucb.views.linucb import LinUCB import json from trans2es.models.tag import TopicTag,Tag from trans2es.models.topic import TopicHomeRecommend import traceback from django.conf import settings from libs.es import ESPerform from search.utils.common import * import libs.tools as Tools class KafkaManager(object): consumser_obj = None @classmethod def get_kafka_consumer_ins(cls, topic_name=None): if not cls.consumser_obj: topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=settings.KAFKA_BROKER_LIST) # cls.consumser_obj.subscribe([topic_name]) return cls.consumser_obj class CollectData(object): def __init__(self): self.linucb_matrix_redis_prefix = "physical:linucb:device_id:" #废弃 self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:" #推荐帖子 self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:" self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:" self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:" # 默认 self.user_feature = [0,1] def _get_user_linucb_info(self, device_id): try: redis_key = self.linucb_matrix_redis_prefix + str(device_id) # dict的key为标签ID,value为4个矩阵 redis_linucb_tag_data_dict = redis_client.hgetall(redis_key) return redis_linucb_tag_data_dict except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return dict() def update_recommend_tag_list(self, device_id,user_feature=None,user_id=None,click_topic_tag_list=None,new_user_click_tag_list = []): try: redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id) if len(redis_linucb_tag_data_dict) == 0: recommend_tag_list = LinUCB.get_default_tag_list(user_id) LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list) else: user_feature = user_feature if user_feature else self.user_feature (recommend_tag_dict,recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys())) recommend_tag_list = list(recommend_tag_dict.keys()) if len(recommend_tag_list) > 0: tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id) redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list)) redis_client.expire(tag_recommend_redis_key, 7*24*60*60) have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND) promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE_DB_NAME).filter(is_online=1).values_list("topic_id",flat=True) have_read_topic_id_list.extend(promote_recommend_topic_id_list) recommend_topic_id_list = list() recommend_topic_id_list_dict = dict() recommend_topic_id_list_click = list() recommend_topic_id_list_click_dict = dict() if click_topic_tag_list and len(click_topic_tag_list)>0: recommend_topic_id_list_click,recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(click_topic_tag_list, have_read_topic_id_list,size=2) if len(recommend_topic_id_list_click) > 0: recommend_topic_id_list.extend(recommend_topic_id_list_click) recommend_topic_id_list_dict.update(recommend_topic_id_list_click_dict) # have_read_topic_id_list.extend(recommend_topic_id_list_click) # click_recommend_redis_key = self.click_recommend_redis_key_prefix + str(device_id) # click_redis_data_dict = { # "data": json.dumps(recommend_topic_id_list), # "datadict":json.dumps(recommend_topic_id_list_dict), # "cursor": 0 # } # redis_client.hmset(click_recommend_redis_key, click_redis_data_dict) tag_id_list = recommend_tag_list[0:100] topic_recommend_redis_key = self.linucb_recommend_topic_id_prefix + str(device_id) # redis_topic_data_dict = redis_client.hgetall(topic_recommend_redis_key) # redis_topic_list = list() # cursor = -1 # if b"data" in redis_topic_data_dict: # redis_topic_list = json.loads(redis_topic_data_dict[b"data"]) if redis_topic_data_dict[ # b"data"] else [] # cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8")) # if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0: # have_read_topic_id_list.extend(redis_topic_list[:2]) if len(new_user_click_tag_list)>0: tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(new_user_click_tag_list, have_read_topic_id_list) else: tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list,have_read_topic_id_list) if len(recommend_topic_id_list)>0 or len(tag_topic_id_list)>0 or len(new_user_click_tag_list) > 0: tag_topic_id_list = recommend_topic_id_list + tag_topic_id_list tag_topic_dict.update(recommend_topic_id_list_dict) redis_data_dict = { "data": json.dumps(tag_topic_id_list), "datadict":json.dumps(tag_topic_dict), "cursor":0 } redis_client.hmset(topic_recommend_redis_key,redis_data_dict) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None): try: user_feature = user_feature if user_feature else self.user_feature return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client) except: logging.error("update_user_linucb_tag_info error!") return False def consume_data_from_kafka(self,topic_name=None): try: user_feature = [1,1] kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name) while True: msg_dict = kafka_consumer_obj.poll(timeout_ms=100) for msg_key in msg_dict: consume_msg = msg_dict[msg_key] for ori_msg in consume_msg: try: logging.info(ori_msg) raw_val_dict = json.loads(ori_msg.value) if "type" in raw_val_dict and \ (raw_val_dict["type"] in ("on_click_feed_topic_card","tag_zone_click_focus")): click_topic_tag_list = list() if "on_click_feed_topic_card" == raw_val_dict["type"]: topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"] device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id))) topic_tag_list = list(TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=topic_id,is_online=True).values_list("tag_id",flat=True)) tag_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(id__in=topic_tag_list,is_online=True,is_deleted=False).values_list("id","collection","is_ai") for id,collection,is_ai in tag_query_results: if collection and is_ai: click_topic_tag_list.append(id) logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % ( str(device_id), str(topic_id), str(click_topic_tag_list))) else: tag_name = raw_val_dict["params"]["query"] query_type = raw_val_dict["params"]["type"] device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None if query_type=="do": tag_list = list(Tag.objects.using(settings.SLAVE_DB_NAME).filter(name=tag_name,is_online=True,is_deleted=False).values_list("id",flat=True)) click_topic_tag_list.extend(tag_list) logging.info("query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % ( str(device_id), tag_name, str(click_topic_tag_list))) logging.info("click_topic_tag_list:%s"%(str(click_topic_tag_list))) is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 for tag_id in click_topic_tag_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 if len(click_topic_tag_list)>0: self.update_recommend_tag_list(device_id, user_feature, user_id,click_topic_tag_list=click_topic_tag_list) elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]: if isinstance(raw_val_dict["params"]["exposure_cards"],str): exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"]) elif isinstance(raw_val_dict["params"]["exposure_cards"],list): exposure_cards_list = raw_val_dict["params"]["exposure_cards"] else: exposure_cards_list = list() device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None logging.warning("type msg:%s" % raw_val_dict.get("type")) exposure_topic_id_list = list() for item in exposure_cards_list: if "card_id" not in item: continue exposure_topic_id = item["card_id"] logging.info( "consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id))) if exposure_topic_id: exposure_topic_id_list.append(exposure_topic_id) topic_tag_id_dict = dict() tag_list = list() exposure_sql_query_results = TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id__in=exposure_topic_id_list).values_list("topic_id","tag_id","is_online","is_collection") # if len(exposure_sql_query_results)>0: for topic_id,tag_id,is_online,is_collection in exposure_sql_query_results: if is_online and is_collection == 1: tag_list.append(tag_id) if is_online: tag_sql_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter( id=tag_id).values_list("id", "collection", "is_ai") for id, collection, is_ai in tag_sql_query_results: if (is_ai == 1) and id not in tag_list: tag_list.append(id) if topic_id not in topic_tag_id_dict: topic_tag_id_dict[topic_id] = list() topic_tag_id_dict[topic_id].append(tag_id) is_click = 0 is_vote = 0 reward = 1 if is_click or is_vote else 0 logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % ( str(device_id), str(topic_tag_id_dict))) for tag_id in tag_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature, user_id) elif "type" in raw_val_dict and "interest_choice_click_next" == raw_val_dict["type"]: if isinstance(raw_val_dict["params"]["tagid_list"],str): tagid_list = json.loads(raw_val_dict["params"]["tagid_list"]) elif isinstance(raw_val_dict["params"]["tagid_list"],list): tagid_list = raw_val_dict["params"]["tagid_list"] else: tagid_list = list() logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None # if len(exposure_sql_query_results)>0: if len(tagid_list) > 0: is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 for tag_id in tagid_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature, user_id,new_user_click_tag_list=tagid_list) else: logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False