• Kai's avatar
    revise · b7deeebb
    Kai authored
    b7deeebb
collect_data.py 16.2 KB
# -*- coding: UTF-8 -*-
# !/usr/bin/env python

from kafka import KafkaConsumer
import random
from libs.cache import redis_client
import logging
from linucb.views.linucb import LinUCB
import json
from trans2es.models.tag import TopicTag,Tag
from trans2es.models.topic import TopicHomeRecommend
import traceback
from django.conf import settings
from libs.es import ESPerform
from search.utils.common import *
import libs.tools as Tools

class KafkaManager(object):
    consumser_obj = None

    @classmethod
    def get_kafka_consumer_ins(cls, topic_name=None):

        if not cls.consumser_obj:
            topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
            cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=settings.KAFKA_BROKER_LIST)
            # cls.consumser_obj.subscribe([topic_name])

        return cls.consumser_obj

class CollectData(object):

    def __init__(self):
        self.linucb_matrix_redis_prefix = "physical:linucb:device_id:"
        #废弃
        self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
        #推荐帖子
        self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:"
        self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:"
        self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:"
        # 默认
        self.user_feature = [0,1]


    def _get_user_linucb_info(self, device_id):
        try:
            redis_key = self.linucb_matrix_redis_prefix + str(device_id)

            # dict的key为标签ID,value为4个矩阵
            redis_linucb_tag_data_dict = redis_client.hgetall(redis_key)

            return redis_linucb_tag_data_dict
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return dict()

    def update_recommend_tag_list(self, device_id,user_feature=None,user_id=None,click_topic_tag_list=None,new_user_click_tag_list = []):
        try:
            redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id)
            if len(redis_linucb_tag_data_dict) == 0:
                recommend_tag_list = LinUCB.get_default_tag_list(user_id)
                LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
            else:
                user_feature = user_feature if user_feature else self.user_feature
                (recommend_tag_dict,recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))
                recommend_tag_list = list(recommend_tag_dict.keys())

            if len(recommend_tag_list) > 0:
                tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
                redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
                redis_client.expire(tag_recommend_redis_key, 7*24*60*60)

                have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND)
                promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE_DB_NAME).filter(is_online=1).values_list("topic_id",flat=True)
                have_read_topic_id_list.extend(promote_recommend_topic_id_list)

                recommend_topic_id_list = list()
                recommend_topic_id_list_dict = dict()
                recommend_topic_id_list_click = list()
                recommend_topic_id_list_click_dict = dict()

                if click_topic_tag_list and len(click_topic_tag_list)>0:
                    recommend_topic_id_list_click,recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(click_topic_tag_list,
                                                                                 have_read_topic_id_list,size=2)
                    if len(recommend_topic_id_list_click) > 0:
                        recommend_topic_id_list.extend(recommend_topic_id_list_click)
                        recommend_topic_id_list_dict.update(recommend_topic_id_list_click_dict)
                        # have_read_topic_id_list.extend(recommend_topic_id_list_click)
                        # click_recommend_redis_key = self.click_recommend_redis_key_prefix + str(device_id)
                        # click_redis_data_dict = {
                        #     "data": json.dumps(recommend_topic_id_list),
                        #     "datadict":json.dumps(recommend_topic_id_list_dict),
                        #     "cursor": 0
                        #   }
                        # redis_client.hmset(click_recommend_redis_key, click_redis_data_dict)

                tag_id_list = recommend_tag_list[0:100]

                topic_recommend_redis_key = self.linucb_recommend_topic_id_prefix + str(device_id)
                # redis_topic_data_dict = redis_client.hgetall(topic_recommend_redis_key)
                # redis_topic_list = list()
                # cursor = -1
                # if b"data" in redis_topic_data_dict:
                #     redis_topic_list = json.loads(redis_topic_data_dict[b"data"]) if redis_topic_data_dict[
                #         b"data"] else []
                #     cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8"))


                # if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0:
                #     have_read_topic_id_list.extend(redis_topic_list[:2])
                if len(new_user_click_tag_list)>0:
                     tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(new_user_click_tag_list, have_read_topic_id_list)

                else:
                     tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list,have_read_topic_id_list)


                if len(recommend_topic_id_list)>0 or len(tag_topic_id_list)>0 or len(new_user_click_tag_list) > 0:
                    tag_topic_id_list = recommend_topic_id_list + tag_topic_id_list
                    tag_topic_dict.update(recommend_topic_id_list_dict)
                    redis_data_dict = {
                        "data": json.dumps(tag_topic_id_list),
                        "datadict":json.dumps(tag_topic_dict),
                        "cursor":0
                    }
                    redis_client.hmset(topic_recommend_redis_key,redis_data_dict)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None):
        try:
            user_feature = user_feature if user_feature else self.user_feature
            return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client)
        except:
            logging.error("update_user_linucb_tag_info error!")
            return False


    def consume_data_from_kafka(self,topic_name=None):
        try:
            user_feature = [1,1]

            kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
            while True:
                msg_dict = kafka_consumer_obj.poll(timeout_ms=100)
                for msg_key in msg_dict:
                    consume_msg = msg_dict[msg_key]
                    for ori_msg in consume_msg:
                        try:
                            logging.info(ori_msg)

                            raw_val_dict = json.loads(ori_msg.value)

                            if "type" in raw_val_dict and \
                                    (raw_val_dict["type"] in ("on_click_feed_topic_card","tag_zone_click_focus")):

                                click_topic_tag_list = list()
                                if "on_click_feed_topic_card" == raw_val_dict["type"]:
                                    topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
                                    device_id = raw_val_dict["device"]["device_id"]
                                    user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None

                                    logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))
                                    topic_tag_list = list(TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=topic_id,is_online=True).values_list("tag_id",flat=True))
                                    tag_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(id__in=topic_tag_list,is_online=True,is_deleted=False).values_list("id","collection","is_ai")
                                    for id,collection,is_ai in tag_query_results:
                                        if collection and is_ai:
                                            click_topic_tag_list.append(id)

                                    logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
                                    str(device_id), str(topic_id), str(click_topic_tag_list)))
                                else:
                                    tag_name = raw_val_dict["params"]["query"]
                                    query_type = raw_val_dict["params"]["type"]
                                    device_id = raw_val_dict["device"]["device_id"]
                                    user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
                                    if query_type=="do":
                                        tag_list = list(Tag.objects.using(settings.SLAVE_DB_NAME).filter(name=tag_name,is_online=True,is_deleted=False).values_list("id",flat=True))
                                        click_topic_tag_list.extend(tag_list)
                                    logging.info("query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % (
                                    str(device_id), tag_name, str(click_topic_tag_list)))


                                logging.info("click_topic_tag_list:%s"%(str(click_topic_tag_list)))

                                is_click = 1
                                is_vote = 0

                                reward = 1 if is_click or is_vote else 0

                                for tag_id in click_topic_tag_list:
                                    self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)

                                # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
                                if len(click_topic_tag_list)>0:
                                    self.update_recommend_tag_list(device_id, user_feature, user_id,click_topic_tag_list=click_topic_tag_list)
                            elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
                                if isinstance(raw_val_dict["params"]["exposure_cards"],str):
                                    exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
                                elif isinstance(raw_val_dict["params"]["exposure_cards"],list):
                                    exposure_cards_list = raw_val_dict["params"]["exposure_cards"]
                                else:
                                    exposure_cards_list = list()
                                device_id = raw_val_dict["device"]["device_id"]
                                user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
                                logging.warning("type msg:%s" % raw_val_dict.get("type"))
                                exposure_topic_id_list = list()
                                for item in exposure_cards_list:
                                    if "card_id" not in item:
                                        continue
                                    exposure_topic_id = item["card_id"]
                                    logging.info(
                                        "consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id)))
                                    if exposure_topic_id:
                                        exposure_topic_id_list.append(exposure_topic_id)

                                topic_tag_id_dict = dict()
                                tag_list = list()
                                exposure_sql_query_results = TopicTag.objects.using(settings.SLAVE_DB_NAME).\
                                    filter(topic_id__in=exposure_topic_id_list).\
                                    values_list("topic_id","tag_id","is_online","is_collection")
                                # if len(exposure_sql_query_results)>0:
                                for topic_id,tag_id,is_online,is_collection in exposure_sql_query_results:
                                    if is_online and is_collection == 1:
                                        tag_list.append(tag_id)
                                    if is_online:
                                        tag_sql_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(
                                            id=tag_id).values_list("id", "collection", "is_ai")
                                        for id, collection, is_ai in tag_sql_query_results:
                                            if (is_ai == 1) and id not in tag_list:
                                                tag_list.append(id)

                                    if topic_id not in topic_tag_id_dict:
                                        topic_tag_id_dict[topic_id] = list()
                                    topic_tag_id_dict[topic_id].append(tag_id)

                                is_click = 0
                                is_vote = 0

                                reward = 1 if is_click or is_vote else 0

                                logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % (
                                str(device_id), str(topic_tag_id_dict)))
                                for tag_id in tag_list:
                                    self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)

                                # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
                                self.update_recommend_tag_list(device_id, user_feature, user_id)
                            elif "type" in raw_val_dict and "interest_choice_click_next" == raw_val_dict["type"]:
                                if isinstance(raw_val_dict["params"]["tagid_list"],str):
                                    tagid_list = json.loads(raw_val_dict["params"]["tagid_list"])
                                elif isinstance(raw_val_dict["params"]["tagid_list"],list):
                                    tagid_list = raw_val_dict["params"]["tagid_list"]
                                else:
                                    tagid_list = list()
                                logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))

                                device_id = raw_val_dict["device"]["device_id"]
                                user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None

                                # if len(exposure_sql_query_results)>0:
                                if len(tagid_list) > 0:
                                   is_click = 1
                                   is_vote = 0

                                   reward = 1 if is_click or is_vote else 0
                                   for tag_id in tagid_list:
                                      self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)

                                # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
                                   self.update_recommend_tag_list(device_id, user_feature, user_id,new_user_click_tag_list=tagid_list)
                            else:
                                logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
                        except:
                            logging.error("catch exception,err_msg:%s" % traceback.format_exc())

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False