#!/usr/bin/env python # -*- coding: utf-8 -*- import django.db.models from django.conf import settings from django.core.management.base import BaseCommand, CommandError import traceback import logging import six import sys from libs.es import ESPerform import trans2es.models as md from trans2es.utils import topic_transfer from libs.table_scan import TableSlicer,TableSlicerChunk from trans2es.type_info import get_type_info_map,TypeInfo from libs.cache import redis_client from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar import json from search.utils.topic import TopicUtils from trans2es.models.pick_topic import PickTopic from trans2es.models.tag import TopicTag,Tag from trans2es.models.user_extra import UserExtra from trans2es.models.group import Group from trans2es.models.topic import Topic,ActionSumAboutTopic from search.utils.common import * from linucb.views.collect_data import CollectData from injection.data_sync.tasks import sync_user_similar_score class Job(object): __es = None def __init__(self, sub_index_name, type_name, chunk): assert isinstance(sub_index_name, six.string_types) assert isinstance(type_name, six.string_types) assert isinstance(chunk, TableSlicerChunk) self._sub_index_name = sub_index_name self._type_name = type_name self._chunk = chunk @classmethod def get_es(cls): if cls.__es is None: cls.__es = ESPerform().get_cli() return cls.__es def __call__(self): type_info = get_type_info_map()[self._type_name] assert isinstance(type_info, TypeInfo) result = type_info.insert_table_chunk( sub_index_name=self._sub_index_name, table_chunk=self._chunk, es=self.get_es(), ) class SyncDataToRedis(object): @classmethod def sync_face_similar_data_to_redis(cls): try: result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False).distinct().values( "participant_user_id").values_list("participant_user_id", flat=True) logging.info("duan add,begin sync_face_similar_data_to_redis!") redis_key_prefix = "physical:user_similar:participant_user_id:" for participant_user_id in result_items: redis_key = redis_key_prefix + str(participant_user_id) similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False, participant_user_id=participant_user_id, similarity__gt=0.3).order_by( "-similarity") item_list = list() for item in similar_result_items: item_list.append( { "contrast_user_id": item.contrast_user_id, "similarity": item.similarity } ) redis_client.set(redis_key, json.dumps(item_list)) logging.info("duan add,participant_user_id:%d set data done!" % participant_user_id) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) class Command(BaseCommand): args = '' help = 'dump data to elasticsearch, parallel' from optparse import make_option option_list = BaseCommand.option_list + ( make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',default=''), make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch', metavar='INDEX_PREFIX'), make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'), make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''), make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True), make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True), make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default=''), make_option('-T', '--test_score', dest='test_score', help='test_score', metavar='TYPE', default='') ) def __sync_data_by_type(self, type_name): try: type_info = get_type_info_map()[type_name] query_set = type_info.queryset slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size) for chunk in slicer.chunks(): job = Job( sub_index_name=type_name, type_name=type_name, chunk=chunk, ) job() except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) def generate_topic_score_detail(self): try: topic_id_dict = TopicUtils.get_recommend_topic_ids(241432787,0, 0, 500,query_type=TopicPageType.HOME_RECOMMEND,test_score=True) for topic_id in topic_id_dict: offline_score = 0.0 user_is_shadow = False topic_sql_item = Topic.objects.filter(id=topic_id) user_is_recommend=0.0 # 是否官方推荐用户 user_query_results = UserExtra.objects.filter(user_id=topic_sql_item[0].user_id) if user_query_results.count() > 0: if user_query_results[0].is_recommend: offline_score += 2.0 user_is_recommend = 2.0 elif user_query_results[0].is_shadow: user_is_shadow = True group_is_recommend=0.0 # 是否官方推荐小组 # if topic_sql_item[0].group and topic_sql_item[0].group.is_recommend: # offline_score += 4.0 # group_is_recommend = 4.0 topic_level_score = 0.0 # 帖子等级 if topic_sql_item[0].content_level == '5': offline_score += 6.0 topic_level_score = 6.0 elif topic_sql_item[0].content_level == '4': offline_score += 5.0 topic_level_score = 5.0 elif topic_sql_item[0].content_level == '3': offline_score += 2.0 topic_level_score = 2.0 exposure_count = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=1).count() click_count = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=2).count() uv_num = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=3).count() exposure_score = 0.0 uv_score = 0.0 if exposure_count > 0: offline_score += click_count / exposure_count exposure_score = click_count / exposure_count if uv_num > 0: offline_score += (topic_sql_item[0].vote_num / uv_num + topic_sql_item[0].reply_num / uv_num) uv_score = (topic_sql_item[0].vote_num / uv_num + topic_sql_item[0].reply_num / uv_num) """ 1:马甲账号是否对总分降权? """ if user_is_shadow: offline_score = offline_score * 0.5 logging.info("test_score######topic_id:%d,score:%f,offline_score:%f,user_is_recommend:%f,group_is_recommend:%f,topic_level_score:%f,exposure_score:%f,uv_score:%f" % (topic_id,topic_id_dict[topic_id][2],offline_score,user_is_recommend,group_is_recommend,topic_level_score,exposure_score,uv_score)) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) def handle(self, *args, **options): try: type_name_list = get_type_info_map().keys() for type_name in type_name_list: if len(options["type"]): if options["type"] == "all" or type_name==options["type"]: logging.info("begin sync [%s] data to es!" % type_name) self.__sync_data_by_type(type_name) if len(options["sync_type"]) and options["sync_type"]=="sync_data_to_es": SyncDataToRedis.sync_face_similar_data_to_redis() if len(options["test_score"]): self.generate_topic_score_detail() if len(options["sync_type"]) and options["sync_type"]=="linucb": collect_obj = CollectData() collect_obj.consume_data_from_kafka() if len(options["sync_type"]) and options["sync_type"]=="similar": sync_user_similar_score() except: logging.error("catch exception,err_msg:%s" % traceback.format_exc())