tasks.py 7.75 KB
# -*- coding: UTF-8 -*-
from celery import shared_task
from django.conf import settings
import datetime
from data_sync.type_info import get_type_info_map
from data_sync.diary.tasks import update_extra_info
from data_sync.user.user import sync_user_level

from qa.models.answer import Answer
from search.models import MixIndex, MixIndexTag
from talos.models.topic import Problem, Article

from gm_types.gaia import INDEX_CARD_TYPE
from gm_types.mimas.qa import CONTENT_CLASS
import logging


@shared_task
def write_to_es(es_type, pk_list):
    pk_list = list(frozenset(pk_list))
    type_info_map = get_type_info_map()
    type_info = type_info_map[es_type]
    logging.info("get pk_list:%s" % pk_list)
    type_info.insert_table_by_pk_list(
        index_prefix=settings.ES_INDEX_PREFIX,
        pk_list=pk_list,
    )


class AnswerScore(object):
    '''
    计算回答的分值,计算规则类似data_sync.question.tran2es.Score
    http://wiki.wanmeizhensuo.com/pages/viewpage.action?pageId=4441797
    '''

    @classmethod
    def get_score(cls, answer):
        now = datetime.datetime.now()

        vote_num = answer.answervote_set.filter(is_fake=False).count()
        content_score = cls.get_answer_content_score(answer.level)
        social_score = cls.get_social_score(vote_num, answer.replys.count())
        time_score = (now - answer.question.create_time).seconds / 3600 * 0.03 * 0.7 + \
                     (now - answer.create_time).seconds / 3600 * 0.06 * 1.5

        answer_score = 0.8 * content_score + 0.2 * social_score - time_score

        return max(0.0, answer_score)

    @staticmethod
    def get_answer_content_score(level):
        if level < 2:
            return 0
        elif level < 3:
            return 5
        elif level < 4:
            return 10
        elif level < 5:
            return 70
        else:
            return 100

    @staticmethod
    def get_social_score(likes_num, reply_num):
        likes_score = AnswerScore.get_likes_score(likes_num)
        reply_score = AnswerScore.get_reply_score(reply_num)
        return 0.4 * likes_score + 0.6 * reply_score

    @staticmethod
    def get_likes_score(likes_num):
        if likes_num <= 5:
            return 10
        elif likes_num <= 20:
            return 20
        elif likes_num <= 50:
            return 30
        elif likes_num <= 70:
            return 60
        elif likes_num <= 100:
            return 70
        else:
            return 100

    @staticmethod
    def get_reply_score(reply_num):
        if reply_num <= 5:
            return 10
        elif reply_num <= 20:
            return 20
        elif reply_num <= 50:
            return 30
        elif reply_num <= 70:
            return 60
        elif reply_num <= 100:
            return 70
        else:
            return 100


@shared_task
def update_knowledge(model_type, pk_list):
    '''
    知识数据同步
    :param model_type:
    :param pk_list:
    :return:
    '''
    if not pk_list:
        return

    if model_type == 'answer':
        answers = Answer.objects.filter(id__in=pk_list)
        answer_in_mix = MixIndex.objects.filter(original_id__in=pk_list, original_type=INDEX_CARD_TYPE.ANSWER)
        answer_in_mix_dict = {mix.original_id: mix for mix in answer_in_mix}

        for item in answers:
            new_tags = item.question.tags

            if item.id in answer_in_mix_dict:
                mix_obj = answer_in_mix_dict[item.id]

                if item.level < CONTENT_CLASS.FINE or not item.is_online or not item.question.is_online:
                    mix_obj.delete()
                    continue
                # 重新计算分值
                answer_score = AnswerScore.get_score(item) if item.is_recommend else mix_obj.answer_score
                mix_obj.answer_score = answer_score
                mix_obj.answer_is_recommend = item.is_recommend
                mix_obj.save()

                # 更新tag
                new_tags_set = set()
                old_tags_dict = {tag.tag_id: tag for tag in mix_obj.mixindextag_set.all()}
                for _tag in new_tags:
                    if not _tag.tag_id in old_tags_dict:
                        # 新增tag关系
                        MixIndexTag.objects.create(mix_index=mix_obj, tag_id=_tag.tag_id)
                    new_tags_set.add(_tag.tag_id)

                # 删除已未关联的tag关系
                remove_tag = old_tags_dict.keys() - new_tags_set
                if remove_tag:
                    for t_id in remove_tag:
                        old_tags_dict[t_id].delete()
            else:
                if item.level < CONTENT_CLASS.FINE or not item.is_online or not item.question.is_online:
                    continue

                answer_score = AnswerScore.get_score(item) if item.is_recommend else 0.0
                # 新增记录
                mix_index_obj = MixIndex.objects.create(original_id=item.id, original_type=INDEX_CARD_TYPE.ANSWER,
                                                        answer_score=answer_score,
                                                        original_create_time=item.create_time,
                                                        answer_is_recommend=item.is_recommend)

                insert_lst = []
                for n_tag in new_tags:
                    insert_lst.append(MixIndexTag(mix_index=mix_index_obj, tag_id=n_tag.tag_id))
                MixIndexTag.objects.bulk_create(insert_lst)

    elif model_type == 'article':
        articles = Article.objects.filter(id__in=pk_list)
        articles_in_mix = MixIndex.objects.filter(original_id__in=pk_list, original_type=INDEX_CARD_TYPE.ARTICLE)
        articles_in_mix_dict = {mix.original_id: mix for mix in articles_in_mix}

        for item in articles:
            new_tags = Problem.objects.get(id=item.article_id).problemtag_set.all()  # 如果下线,93 line 直接continue,不会更新tag
            if item.id in articles_in_mix_dict:
                mix_obj = articles_in_mix_dict[item.id]

                if not item.is_online:
                    mix_obj.delete()  # 关联外键是否删除
                    continue

                # 更新tag
                new_tags_set = set()
                old_tags_dict = {tag.tag_id: tag for tag in mix_obj.mixindextag_set.all()}
                for _tag in new_tags:
                    if not _tag.tag_id in old_tags_dict:
                        # 新增tag关系
                        MixIndexTag.objects.create(mix_index=mix_obj, tag_id=_tag.tag_id)
                    new_tags_set.add(_tag.tag_id)

                # 删除已未关联的tag关系
                remove_tag = old_tags_dict.keys() - new_tags_set
                if remove_tag:
                    for t_id in remove_tag:
                        old_tags_dict[t_id].delete()
            else:
                if not item.is_online:
                    continue

                # 新增记录
                mix_index_obj = MixIndex.objects.create(original_id=item.id, original_type=INDEX_CARD_TYPE.ARTICLE,
                                                        original_create_time=item.created_time)

                insert_lst = []
                for n_tag in new_tags:
                    insert_lst.append(MixIndexTag(mix_index=mix_index_obj, tag_id=n_tag.tag_id))
                MixIndexTag.objects.bulk_create(insert_lst)


@shared_task
def sync_diary_extra_info(diary_ids: list):
    """同步日记本相关统计数据。

    更具变更的日记本 id 列表从三个方面进行统计表的更新:点赞总数,另外贴子总数在其他地方
    有更新,回复总数api_diary中有字段进行更新
    """

    if not diary_ids:
        return

    update_extra_info(diary_ids)


@shared_task
def sync_user_level_to_gaia(user_ids: list):
    """同步用户等级信息到 gaia。"""

    sync_user_level(user_ids)