# -*- coding: UTF-8 -*- from celery import shared_task from django.conf import settings import datetime from data_sync.type_info import get_type_info_map from data_sync.diary.tasks import update_extra_info from data_sync.user.user import sync_user_level from qa.models.answer import Answer from search.models import MixIndex, MixIndexTag from talos.models.topic import Problem, Article from gm_types.gaia import INDEX_CARD_TYPE from gm_types.mimas.qa import CONTENT_CLASS import logging @shared_task def write_to_es(es_type, pk_list): pk_list = list(frozenset(pk_list)) type_info_map = get_type_info_map() type_info = type_info_map[es_type] logging.info("get pk_list:%s" % pk_list) type_info.insert_table_by_pk_list( index_prefix=settings.ES_INDEX_PREFIX, pk_list=pk_list, ) class AnswerScore(object): ''' 计算回答的分值,计算规则类似data_sync.question.tran2es.Score http://wiki.wanmeizhensuo.com/pages/viewpage.action?pageId=4441797 ''' @classmethod def get_score(cls, answer): now = datetime.datetime.now() vote_num = answer.answervote_set.filter(is_fake=False).count() content_score = cls.get_answer_content_score(answer.level) social_score = cls.get_social_score(vote_num, answer.replys.count()) time_score = (now - answer.question.create_time).seconds / 3600 * 0.03 * 0.7 + \ (now - answer.create_time).seconds / 3600 * 0.06 * 1.5 answer_score = 0.8 * content_score + 0.2 * social_score - time_score return max(0.0, answer_score) @staticmethod def get_answer_content_score(level): if level < 2: return 0 elif level < 3: return 5 elif level < 4: return 10 elif level < 5: return 70 else: return 100 @staticmethod def get_social_score(likes_num, reply_num): likes_score = AnswerScore.get_likes_score(likes_num) reply_score = AnswerScore.get_reply_score(reply_num) return 0.4 * likes_score + 0.6 * reply_score @staticmethod def get_likes_score(likes_num): if likes_num <= 5: return 10 elif likes_num <= 20: return 20 elif likes_num <= 50: return 30 elif likes_num <= 70: return 60 elif likes_num <= 100: return 70 else: return 100 @staticmethod def get_reply_score(reply_num): if reply_num <= 5: return 10 elif reply_num <= 20: return 20 elif reply_num <= 50: return 30 elif reply_num <= 70: return 60 elif reply_num <= 100: return 70 else: return 100 @shared_task def update_knowledge(model_type, pk_list): ''' 知识数据同步 :param model_type: :param pk_list: :return: ''' if not pk_list: return if model_type == 'answer': answers = Answer.objects.filter(id__in=pk_list) answer_in_mix = MixIndex.objects.filter(original_id__in=pk_list, original_type=INDEX_CARD_TYPE.ANSWER) answer_in_mix_dict = {mix.original_id: mix for mix in answer_in_mix} for item in answers: new_tags = item.question.tags if item.id in answer_in_mix_dict: mix_obj = answer_in_mix_dict[item.id] if item.level < CONTENT_CLASS.FINE or not item.is_online or not item.question.is_online: mix_obj.delete() continue # 重新计算分值 answer_score = AnswerScore.get_score(item) if item.is_recommend else mix_obj.answer_score mix_obj.answer_score = answer_score mix_obj.answer_is_recommend = item.is_recommend mix_obj.save() # 更新tag new_tags_set = set() old_tags_dict = {tag.tag_id: tag for tag in mix_obj.mixindextag_set.all()} for _tag in new_tags: if not _tag.tag_id in old_tags_dict: # 新增tag关系 MixIndexTag.objects.create(mix_index=mix_obj, tag_id=_tag.tag_id) new_tags_set.add(_tag.tag_id) # 删除已未关联的tag关系 remove_tag = old_tags_dict.keys() - new_tags_set if remove_tag: for t_id in remove_tag: old_tags_dict[t_id].delete() else: if item.level < CONTENT_CLASS.FINE or not item.is_online or not item.question.is_online: continue answer_score = AnswerScore.get_score(item) if item.is_recommend else 0.0 # 新增记录 mix_index_obj = MixIndex.objects.create(original_id=item.id, original_type=INDEX_CARD_TYPE.ANSWER, answer_score=answer_score, original_create_time=item.create_time, answer_is_recommend=item.is_recommend) insert_lst = [] for n_tag in new_tags: insert_lst.append(MixIndexTag(mix_index=mix_index_obj, tag_id=n_tag.tag_id)) MixIndexTag.objects.bulk_create(insert_lst) elif model_type == 'article': articles = Article.objects.filter(id__in=pk_list) articles_in_mix = MixIndex.objects.filter(original_id__in=pk_list, original_type=INDEX_CARD_TYPE.ARTICLE) articles_in_mix_dict = {mix.original_id: mix for mix in articles_in_mix} for item in articles: new_tags = Problem.objects.get(id=item.article_id).problemtag_set.all() # 如果下线,93 line 直接continue,不会更新tag if item.id in articles_in_mix_dict: mix_obj = articles_in_mix_dict[item.id] if not item.is_online: mix_obj.delete() # 关联外键是否删除 continue # 更新tag new_tags_set = set() old_tags_dict = {tag.tag_id: tag for tag in mix_obj.mixindextag_set.all()} for _tag in new_tags: if not _tag.tag_id in old_tags_dict: # 新增tag关系 MixIndexTag.objects.create(mix_index=mix_obj, tag_id=_tag.tag_id) new_tags_set.add(_tag.tag_id) # 删除已未关联的tag关系 remove_tag = old_tags_dict.keys() - new_tags_set if remove_tag: for t_id in remove_tag: old_tags_dict[t_id].delete() else: if not item.is_online: continue # 新增记录 mix_index_obj = MixIndex.objects.create(original_id=item.id, original_type=INDEX_CARD_TYPE.ARTICLE, original_create_time=item.created_time) insert_lst = [] for n_tag in new_tags: insert_lst.append(MixIndexTag(mix_index=mix_index_obj, tag_id=n_tag.tag_id)) MixIndexTag.objects.bulk_create(insert_lst) @shared_task def sync_diary_extra_info(diary_ids: list): """同步日记本相关统计数据。 更具变更的日记本 id 列表从三个方面进行统计表的更新:点赞总数,另外贴子总数在其他地方 有更新,回复总数api_diary中有字段进行更新 """ if not diary_ids: return update_extra_info(diary_ids) @shared_task def sync_user_level_to_gaia(user_ids: list): """同步用户等级信息到 gaia。""" sync_user_level(user_ids)