collect_data.py 8.12 KB
# -*- coding: UTF-8 -*-
# !/usr/bin/env python

from kafka import KafkaConsumer
from libs.cache import redis_client
import logging
from linucb.views.linucb import LinUCB
import json
from trans2es.models.tag import TopicTag
import traceback

class KafkaManager(object):
    # kafka信息
    kafka_broker_list = "192.168.13.114:9092,192.168.13.116:9092,192.168.13.115:9092"
    topic_name = "alpha-maidian-data"
    consumser_obj = None

    @classmethod
    def get_kafka_consumer_ins(cls, topic_name=None):

        if not cls.consumser_obj:
            topic_name = cls.topic_name if not topic_name else topic_name
            cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=cls.kafka_broker_list)
            # cls.consumser_obj.subscribe([topic_name])

        return cls.consumser_obj

class CollectData(object):

    def __init__(self):
        self.linucb_matrix_redis_prefix = "physical:linucb:device_id:"
        self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
        # 默认
        self.user_feature = [0,1]


    def _get_user_linucb_info(self, device_id):
        try:
            redis_key = self.linucb_matrix_redis_prefix + str(device_id)

            # dict的key为标签ID,value为4个矩阵
            redis_linucb_tag_data_dict = redis_client.hgetall(redis_key)

            return redis_linucb_tag_data_dict
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return dict()

    def update_recommend_tag_list(self, device_id,user_feature=None):
        try:
            recommend_tag_list = list()
            redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id)
            if len(redis_linucb_tag_data_dict) == 0:
                recommend_tag_list = LinUCB.get_default_tag_list()
                LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
            else:
                user_feature = user_feature if user_feature else self.user_feature
                recommend_tag_list = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))

            logging.info("duan add,device_id:%s,recommend_tag_list:%s" % (str(device_id), str(recommend_tag_list)))
            if len(recommend_tag_list) > 0:
                tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
                redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
                # Todo:设置过期时间,调研set是否支持
                redis_client.expire(tag_recommend_redis_key, 7*24*60*60)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None):
        try:
            user_feature = user_feature if user_feature else self.user_feature
            return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client)
        except:
            logging.error("update_user_linucb_tag_info error!")
            return False


    def consume_data_from_kafka(self,topic_name=None):
        try:
            user_feature = [1,1]

            kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
            while True:
                msg_dict = kafka_consumer_obj.poll(timeout_ms=100)
                for msg_key in msg_dict:
                    consume_msg = msg_dict[msg_key]
                    for ori_msg in consume_msg:
                        try:
                            logging.info(ori_msg)

                            raw_val_dict = json.loads(ori_msg.value)

                            if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]:
                                topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
                                device_id = raw_val_dict["device"]["device_id"]

                                logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))

                                tag_list = list()
                                sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=topic_id)
                                for sql_item in sql_query_results:
                                    tag_list.append(sql_item.tag_id)

                                is_click = 1
                                is_vote = 0

                                reward = 1 if is_click or is_vote else 0

                                logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
                                str(device_id), str(topic_id), str(tag_list)))
                                for tag_id in tag_list:
                                    self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)

                                # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
                                self.update_recommend_tag_list(device_id, user_feature)
                            elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
                                if isinstance(raw_val_dict["params"]["exposure_cards"],str):
                                    exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
                                elif isinstance(raw_val_dict["params"]["exposure_cards"],list):
                                    exposure_cards_list = raw_val_dict["params"]["exposure_cards"]
                                else:
                                    exposure_cards_list = list()
                                device_id = raw_val_dict["device"]["device_id"]

                                exposure_topic_id_list = list()
                                for item in exposure_cards_list:
                                    if "card_id" not in item:
                                        continue
                                    exposure_topic_id = item["card_id"]
                                    logging.info(
                                        "consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id)))
                                    exposure_topic_id_list.append(exposure_topic_id)

                                topic_tag_id_dict = dict()
                                tag_list = list()
                                sql_query_results = TopicTag.objects.filter(is_online=True, topic_id__in=exposure_topic_id_list)
                                for sql_item in sql_query_results:
                                    tag_list.append(sql_item.tag_id)

                                    if sql_item.topic_id not in topic_tag_id_dict:
                                        topic_tag_id_dict[sql_item.topic_id] = list()
                                    topic_tag_id_dict[sql_item.topic_id].append(sql_item.tag_id)

                                is_click = 0
                                is_vote = 0

                                reward = 1 if is_click or is_vote else 0

                                logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % (
                                str(device_id), str(topic_tag_id_dict)))
                                for tag_id in tag_list:
                                    self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)

                                # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
                                self.update_recommend_tag_list(device_id, user_feature)
                            else:
                                logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
                        except:
                            logging.error("catch exception,err_msg:%s" % traceback.format_exc())

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False