# -*- coding: UTF-8 -*- # !/usr/bin/env python from kafka import KafkaConsumer import random from libs.cache import redis_client import logging from linucb.views.linucb import LinUCB import json from trans2es.models.tag import TopicTag,Tag from trans2es.models.topic import TopicHomeRecommend import traceback from django.conf import settings from libs.es import ESPerform from search.utils.common import * import libs.tools as Tools class KafkaManager(object): consumser_obj = None @classmethod def get_kafka_consumer_ins(cls, topic_name=None): if not cls.consumser_obj: topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=settings.KAFKA_BROKER_LIST) # cls.consumser_obj.subscribe([topic_name]) return cls.consumser_obj class CollectData(object): def __init__(self): self.linucb_matrix_redis_prefix = "physical:linucb:device_id:" self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:" self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:" self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:" self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:" # 默认 self.user_feature = [0,1] def _get_user_linucb_info(self, device_id): try: redis_key = self.linucb_matrix_redis_prefix + str(device_id) # dict的key为标签ID,value为4个矩阵 redis_linucb_tag_data_dict = redis_client.hgetall(redis_key) return redis_linucb_tag_data_dict except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return dict() def update_recommend_tag_list(self, device_id,user_feature=None,user_id=None,click_topic_tag_list=None,new_user_click_tag_list = []): try: redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id) if len(redis_linucb_tag_data_dict) == 0: recommend_tag_list = LinUCB.get_default_tag_list(user_id) LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list) else: user_feature = user_feature if user_feature else self.user_feature (recommend_tag_dict,recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys())) recommend_tag_list = list(recommend_tag_dict.keys()) if len(recommend_tag_list) > 0: tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id) redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list)) # Todo:设置过期时间,调研set是否支持 redis_client.expire(tag_recommend_redis_key, 7*24*60*60) have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND) promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE_DB_NAME).filter(is_online=1).values_list("topic_id",flat=True) have_read_topic_id_list.extend(promote_recommend_topic_id_list) recommend_topic_id_list = list() recommend_topic_id_list_dict = list() recommend_topic_id_list_click = list() recommend_topic_id_list_click_dict = list() if click_topic_tag_list: if len(click_topic_tag_list)>0: recommend_topic_id_list_click,recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(click_topic_tag_list, have_read_topic_id_list,size=2) if len(recommend_topic_id_list_click) > 0: recommend_topic_id_list.extend(recommend_topic_id_list_click) recommend_topic_id_list_dict.extend(recommend_topic_id_list_click_dict) have_read_topic_id_list.extend(recommend_topic_id_list_click) click_recommend_redis_key = self.click_recommend_redis_key_prefix + str(device_id) click_redis_data_dict = { "data": json.dumps(recommend_topic_id_list), "datadict":json.dumps(recommend_topic_id_list_dict), "cursor": 0 } redis_client.hmset(click_recommend_redis_key, click_redis_data_dict) tag_id_list = recommend_tag_list[0:100] topic_recommend_redis_key = self.linucb_recommend_topic_id_prefix + str(device_id) redis_topic_data_dict = redis_client.hgetall(topic_recommend_redis_key) redis_topic_list = list() cursor = -1 if b"data" in redis_topic_data_dict: redis_topic_list = json.loads(redis_topic_data_dict[b"data"]) if redis_topic_data_dict[ b"data"] else [] cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8")) if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0: have_read_topic_id_list.extend(redis_topic_list[:2]) tag_topic_dict = list() if len(new_user_click_tag_list)>0: tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(new_user_click_tag_list, have_read_topic_id_list) else: tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list,have_read_topic_id_list) if len(recommend_topic_id_list)>0 or len(new_user_click_tag_list) > 0: tag_topic_id_list = recommend_topic_id_list + tag_topic_id_list tag_topic_dict = recommend_topic_id_list_dict + tag_topic_dict redis_data_dict = { "data": json.dumps(tag_topic_id_list), "datadict":json.dumps(tag_topic_dict), "cursor":0 } redis_client.hmset(topic_recommend_redis_key,redis_data_dict) else: if cursor<=0 and len(redis_topic_list)>0: tag_topic_dict = redis_topic_list[:2] + tag_topic_dict tag_topic_dict = list(set(tag_topic_dict)) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None): try: user_feature = user_feature if user_feature else self.user_feature return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client) except: logging.error("update_user_linucb_tag_info error!") return False def consume_data_from_kafka(self,topic_name=None): try: user_feature = [1,1] kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name) while True: msg_dict = kafka_consumer_obj.poll(timeout_ms=100) for msg_key in msg_dict: consume_msg = msg_dict[msg_key] for ori_msg in consume_msg: try: logging.info(ori_msg) raw_val_dict = json.loads(ori_msg.value) if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]: topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"] device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id))) tag_list = list() click_topic_tag_list = list() collection_tag_sql_query_results = TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=topic_id).values_list("tag_id","is_online","is_collection") # if len(collection_tag_sql_query_results)>0: for tag_id,is_online,is_collection in collection_tag_sql_query_results: if is_online and is_collection == 1: click_topic_tag_list.append(tag_id) logging.info("click_topic_tag_list:%s"%(str(click_topic_tag_list))) click_sql_query_results = TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=topic_id).values_list("tag_id","is_online") for tag_id,is_online in click_sql_query_results: if is_online: tag_list.append(tag_id) tag_sql_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter( id=tag_id).values_list("id", "collection", "is_ai") for id, collection, is_ai in tag_sql_query_results: if (is_ai == 1) and id not in click_topic_tag_list: click_topic_tag_list.append(id) logging.info("click_topic_tag_list:%s"%(str(click_topic_tag_list))) is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % ( str(device_id), str(topic_id), str(click_topic_tag_list))) for tag_id in click_topic_tag_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature, user_id,click_topic_tag_list=click_topic_tag_list) elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]: if isinstance(raw_val_dict["params"]["exposure_cards"],str): exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"]) elif isinstance(raw_val_dict["params"]["exposure_cards"],list): exposure_cards_list = raw_val_dict["params"]["exposure_cards"] else: exposure_cards_list = list() device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None logging.warning("type msg:%s" % raw_val_dict.get("type")) exposure_topic_id_list = list() for item in exposure_cards_list: if "card_id" not in item: continue exposure_topic_id = item["card_id"] logging.info( "consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id))) if exposure_topic_id: exposure_topic_id_list.append(exposure_topic_id) topic_tag_id_dict = dict() tag_list = list() exposure_sql_query_results = TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id__in=exposure_topic_id_list).values_list("topic_id","tag_id","is_online","is_collection") # if len(exposure_sql_query_results)>0: for topic_id,tag_id,is_online,is_collection in exposure_sql_query_results: if is_online and is_collection == 1: tag_list.append(tag_id) if is_online: tag_sql_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter( id=tag_id).values_list("id", "collection", "is_ai") for id, collection, is_ai in tag_sql_query_results: if (is_ai == 1) and id not in tag_list: tag_list.append(id) if topic_id not in topic_tag_id_dict: topic_tag_id_dict[topic_id] = list() topic_tag_id_dict[topic_id].append(tag_id) is_click = 0 is_vote = 0 reward = 1 if is_click or is_vote else 0 logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % ( str(device_id), str(topic_tag_id_dict))) for tag_id in tag_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature, user_id) elif "type" in raw_val_dict and "interest_choice_click_next" == raw_val_dict["type"]: if isinstance(raw_val_dict["params"]["tagid_list"],str): tagid_list = json.loads(raw_val_dict["params"]["tagid_list"]) elif isinstance(raw_val_dict["params"]["tagid_list"],list): tagid_list = raw_val_dict["params"]["tagid_list"] else: tagid_list = list() logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) device_id = raw_val_dict["device"]["device_id"] user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None # if len(exposure_sql_query_results)>0: if len(tagid_list) > 0: is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 for tag_id in tagid_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature, user_id,new_user_click_tag_list=tagid_list) else: logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False