# -*- coding: UTF-8 -*- # !/usr/bin/env python from kafka import KafkaConsumer from libs.cache import redis_client import logging from linucb.views.linucb import LinUCB import json from trans2es.models.tag import TopicTag import traceback class KafkaManager(object): # kafka信息 kafka_broker_list = "192.168.13.114:9092,192.168.13.116:9092,192.168.13.115:9092" topic_name = "alpha-maidian-data" consumser_obj = None @classmethod def get_kafka_consumer_ins(cls, topic_name=None): if not cls.consumser_obj: topic_name = cls.topic_name if not topic_name else topic_name cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=cls.kafka_broker_list) # cls.consumser_obj.subscribe([topic_name]) return cls.consumser_obj class CollectData(object): def __init__(self): self.linucb_matrix_redis_prefix = "physical:linucb:device_id:" self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:" # 默认 self.user_feature = [0,1] def _get_user_linucb_info(self, device_id): try: redis_key = self.linucb_matrix_redis_prefix + str(device_id) # dict的key为标签ID,value为4个矩阵 redis_linucb_tag_data_dict = redis_client.hgetall(redis_key) return redis_linucb_tag_data_dict except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return dict() def update_recommend_tag_list(self, device_id,user_feature=None): try: recommend_tag_list = list() redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id) if len(redis_linucb_tag_data_dict) == 0: recommend_tag_list = LinUCB.get_default_tag_list() LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list) else: user_feature = user_feature if user_feature else self.user_feature recommend_tag_list = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys())) logging.info("duan add,device_id:%s,recommend_tag_list:%s" % (str(device_id), str(recommend_tag_list))) if len(recommend_tag_list) > 0: tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id) redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list)) # Todo:设置过期时间,调研set是否支持 redis_client.expire(tag_recommend_redis_key, 7*24*60*60) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None): try: user_feature = user_feature if user_feature else self.user_feature return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client) except: logging.error("update_user_linucb_tag_info error!") return False def consume_data_from_kafka(self,topic_name=None): try: user_feature = [1,1] kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name) while True: msg_dict = kafka_consumer_obj.poll(timeout_ms=100) for msg_key in msg_dict: consume_msg = msg_dict[msg_key] for ori_msg in consume_msg: try: logging.info(ori_msg) raw_val_dict = json.loads(ori_msg.value) if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]: topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"] device_id = raw_val_dict["device"]["device_id"] logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id))) tag_list = list() sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=topic_id) for sql_item in sql_query_results: tag_list.append(sql_item.tag_id) is_click = 1 is_vote = 0 reward = 1 if is_click or is_vote else 0 logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % ( str(device_id), str(topic_id), str(tag_list))) for tag_id in tag_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature) elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]: if isinstance(raw_val_dict["params"]["exposure_cards"],str): exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"]) elif isinstance(raw_val_dict["params"]["exposure_cards"],list): exposure_cards_list = raw_val_dict["params"]["exposure_cards"] else: exposure_cards_list = list() device_id = raw_val_dict["device"]["device_id"] exposure_topic_id_list = list() for item in exposure_cards_list: if "card_id" not in item: continue exposure_topic_id = item["card_id"] logging.info( "consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id))) exposure_topic_id_list.append(exposure_topic_id) topic_tag_id_dict = dict() tag_list = list() sql_query_results = TopicTag.objects.filter(is_online=True, topic_id__in=exposure_topic_id_list) for sql_item in sql_query_results: tag_list.append(sql_item.tag_id) if sql_item.topic_id not in topic_tag_id_dict: topic_tag_id_dict[sql_item.topic_id] = list() topic_tag_id_dict[sql_item.topic_id].append(sql_item.tag_id) is_click = 0 is_vote = 0 reward = 1 if is_click or is_vote else 0 logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % ( str(device_id), str(topic_tag_id_dict))) for tag_id in tag_list: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 self.update_recommend_tag_list(device_id, user_feature) else: logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False