trans2es_data2es_parallel.py 12.6 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import django.db.models
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
import six
import sys

from libs.es import ESPerform
import trans2es.models as md
from trans2es.utils import topic_transfer
from libs.table_scan import TableSlicer,TableSlicerChunk
from trans2es.type_info import get_type_info_map,TypeInfo

from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar
import json

from search.utils.topic import TopicUtils
from trans2es.models.pick_topic import PickTopic
from trans2es.models.tag import TopicTag,Tag
from trans2es.models.user_extra import UserExtra
from trans2es.models.group import Group
from trans2es.models.topic import Topic,ActionSumAboutTopic
from search.utils.common import *
from linucb.views.collect_data import CollectData
from injection.data_sync.tasks import sync_user_similar_score
import datetime

from trans2es.models.tag import Tag
from libs.cache import redis_client
from trans2es.models.tag import TopicTag
from libs.error import logging_exception
from trans2es.models.portrait_stat import LikeTopicStat



class Job(object):
    __es = None

    def __init__(self, sub_index_name, type_name, chunk):
        assert isinstance(sub_index_name, six.string_types)
        assert isinstance(type_name, six.string_types)
        assert isinstance(chunk, TableSlicerChunk)
        self._sub_index_name = sub_index_name
        self._type_name = type_name
        self._chunk = chunk

    @classmethod
    def get_es(cls):
        if cls.__es is None:
            cls.__es = ESPerform().get_cli()
        return cls.__es

    def __call__(self):
        type_info = get_type_info_map()[self._type_name]
        assert isinstance(type_info, TypeInfo)

        result = type_info.insert_table_chunk(
            sub_index_name=self._sub_index_name,
            table_chunk=self._chunk,
            es=self.get_es(),
        )


class SyncDataToRedis(object):

    @classmethod
    def sync_face_similar_data_to_redis(cls):
        try:
            result_items = FaceUserContrastSimilar.objects.filter(is_online=True,
                                                                  is_deleted=False).distinct().values(
                "participant_user_id").values_list("participant_user_id", flat=True)

            logging.info("duan add,begin sync_face_similar_data_to_redis!")

            redis_key_prefix = "physical:user_similar:participant_user_id:"
            for participant_user_id in result_items:

                redis_key = redis_key_prefix + str(participant_user_id)
                similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False,
                                                                              participant_user_id=participant_user_id,
                                                                              similarity__gt=0.3).order_by(
                    "-similarity")

                item_list = list()
                for item in similar_result_items:
                    item_list.append(
                        {
                            "contrast_user_id": item.contrast_user_id,
                            "similarity": item.similarity
                        }
                    )
                redis_client.set(redis_key, json.dumps(item_list))

                logging.info("duan add,participant_user_id:%d set data done!" % participant_user_id)
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())


class Command(BaseCommand):

    args = ''
    help = 'dump data to elasticsearch, parallel'

    from optparse import make_option

    option_list = BaseCommand.option_list + (
        make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',default=''),
        make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch', metavar='INDEX_PREFIX'),
        make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
        make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
        make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
        make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
        make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default=''),
        make_option('-T', '--test_score', dest='test_score', help='test_score', metavar='TYPE', default='')

    )

    def __sync_data_by_type(self, type_name):
        try:
            type_info = get_type_info_map()[type_name]
            query_set = type_info.queryset

            slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
            for chunk in slicer.chunks():
                job = Job(
                    sub_index_name=type_name,
                    type_name=type_name,
                    chunk=chunk,
                )
                job()
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())

    def generate_topic_score_detail(self):
        try:
            topic_id_dict = TopicUtils.get_recommend_topic_ids(241432787,0, 0, 500,query_type=TopicPageType.HOME_RECOMMEND,test_score=True)
            for topic_id in topic_id_dict:

                offline_score = 0.0
                user_is_shadow = False

                topic_sql_item = Topic.objects.filter(id=topic_id)

                user_is_recommend=0.0
                # 是否官方推荐用户
                user_query_results = UserExtra.objects.filter(user_id=topic_sql_item[0].user_id)
                if user_query_results.count() > 0:
                    if user_query_results[0].is_recommend:
                        offline_score += 2.0
                        user_is_recommend = 2.0
                    elif user_query_results[0].is_shadow:
                        user_is_shadow = True

                group_is_recommend=0.0
                # 是否官方推荐小组
                # if topic_sql_item[0].group and topic_sql_item[0].group.is_recommend:
                #     offline_score += 4.0
                #     group_is_recommend = 4.0

                topic_level_score = 0.0
                # 帖子等级
                if topic_sql_item[0].content_level == '5':
                    offline_score += 6.0
                    topic_level_score = 6.0
                elif topic_sql_item[0].content_level == '4':
                    offline_score += 5.0
                    topic_level_score = 5.0
                elif topic_sql_item[0].content_level == '3':
                    offline_score += 2.0
                    topic_level_score = 2.0

                exposure_count = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=1).count()
                click_count = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=2).count()
                uv_num = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=3).count()

                exposure_score = 0.0
                uv_score = 0.0
                if exposure_count > 0:
                    offline_score += click_count / exposure_count
                    exposure_score = click_count / exposure_count
                if uv_num > 0:
                    offline_score += (topic_sql_item[0].vote_num / uv_num + topic_sql_item[0].reply_num / uv_num)
                    uv_score = (topic_sql_item[0].vote_num / uv_num + topic_sql_item[0].reply_num / uv_num)

                """
                    1:马甲账号是否对总分降权?
                """
                if user_is_shadow:
                    offline_score = offline_score * 0.5

                logging.info("test_score######topic_id:%d,score:%f,offline_score:%f,user_is_recommend:%f,group_is_recommend:%f,topic_level_score:%f,exposure_score:%f,uv_score:%f"
                             % (topic_id,topic_id_dict[topic_id][2],offline_score,user_is_recommend,group_is_recommend,topic_level_score,exposure_score,uv_score))
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())


    def sync_tag_collecction_name_set(self):
        try:
            collection_redis_key_name = "physical:official_tag_name_set"
            tag_list = list(TopicTag.objects.filter(is_online=True, is_collection=True).values_list("tag_id", flat=True))

            tag_name_list = list(Tag.objects.filter(id__in=tag_list, is_online=True, is_deleted=False,
                                          is_category=False).values_list("name", flat=True))

            logging.info("duan add,tag_list_len:%d,tag_name_list_len:%d" % (len(tag_list),len(tag_name_list)))
            for tag_name in tag_name_list:
                try:
                    if tag_name and len(tag_name)>0 and tag_name[0]:
                        redis_client.sadd(collection_redis_key_name,tag_name[0])
                except:
                    pass
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())


    def sub_redis_new_topic_ids(self):
        try:
            ps = redis_client.pubsub()
            ps.subscribe("new_topic_impression")
            all_new_topic_impression_count_key = "all_new_topic_impression_count_key"
            for item in ps.listen():
                if item['type'] == 'message':
                    new_topic_ids = json.loads(item["data"])
                    all_new_topic_impression_count = json.loads(redis_client.get(all_new_topic_impression_count_key))
                    insert_topic_ids = []
                    for topic in new_topic_ids:
                        topic = str(topic)
                        if topic in all_new_topic_impression_count:
                            all_new_topic_impression_count[topic] = all_new_topic_impression_count[topic] + 1
                            if all_new_topic_impression_count[topic] > 100:
                                insert_topic_ids.append(int(topic))
                                all_new_topic_impression_count.pop(topic)
                        else:
                            all_new_topic_impression_count[topic] = 1
                    if insert_topic_ids:
                        insert_list = []
                        for topic in insert_topic_ids:
                            insert_list.append(
                                LikeTopicStat(create_time=datetime.datetime.today(),
                                                  update_time=datetime.datetime.today(),
                                                  topic_id=topic, is_new_topic=0, topic_ctr_30=0.0, like_rate_30=0.0))
                        LikeTopicStat.objects.using(settings.MASTER_DB_NAME).bulk_create(insert_list)
                        logging.info("impressions count gt 100 topic ids" + str(insert_topic_ids))
                    json_all_new_topic_impression_count = json.dumps(all_new_topic_impression_count)
                    logging.info("all_new_topic_impression_count" + str(all_new_topic_impression_count))
                    redis_client.set(all_new_topic_impression_count_key, json_all_new_topic_impression_count)
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())


    def handle(self, *args, **options):
        try:
            type_name_list = get_type_info_map().keys()
            for type_name in type_name_list:

                if len(options["type"]):
                    if options["type"] == "all" or type_name==options["type"]:
                        logging.info("begin sync [%s] data to es!" % type_name)
                        self.__sync_data_by_type(type_name)


            if len(options["sync_type"]) and options["sync_type"]=="sync_data_to_es":
                SyncDataToRedis.sync_face_similar_data_to_redis()
            if len(options["test_score"]):
                self.generate_topic_score_detail()

            if len(options["sync_type"]) and options["sync_type"]=="linucb":
                collect_obj = CollectData()
                collect_obj.consume_data_from_kafka()

            if len(options["sync_type"]) and options["sync_type"]=="similar":
                sync_user_similar_score()

            if len(options["sync_type"]) and options["sync_type"]=="tagname":
                self.sync_tag_collecction_name_set()

            if len(options["sync_type"]) and options["sync_type"] == "new_topic_sub":
                self.sub_redis_new_topic_ids()

        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())