from collections import namedtuple from datetime import datetime from celery import shared_task from utils.rpc import ( rpc_client, logging_exception, ) from gm_types.push import PUSH_INFO_TYPE, AUTOMATED_PUSH from talos.services import UserService from talos.cache.gaia import push_cache from utils.push import push_task_to_user_multi from utils.protocol import gm_protocol from qa.libs import _get_content_text from qa.models import ( Answer, QualityQuestionRead, QualityUserQuestion, QualityAnswerRead, QualityQuestion, QualityAuthorAnswer, ) from communal.tasks import intelligent_push_task @shared_task def write_unread_data(quality_question_id, answer_id=0, _type=None): """ 用户提问/作者回复 写未读记录 :param answer_id: :param quality_question_id: :param _type: :return: """ nt = namedtuple('_TYPE', ['ASK_QUESTION', 'CREATE_ANSWER']) _TYPE = nt(ASK_QUESTION='ask_question', CREATE_ANSWER='create_answer') if not (_type and _type in _TYPE): return if _type == _TYPE.ASK_QUESTION: try: answer = Answer.objects.get(id=answer_id) except Answer.DoesNotExist: return QualityQuestionRead.objects.create( quality_question_id=quality_question_id, user_id=answer.user_id, is_read=False, ) return elif _type == _TYPE.CREATE_ANSWER: asked_question_users = list(QualityUserQuestion.objects.filter( quality_question_id=quality_question_id, is_online=True ).values_list('user_id', flat=True)) if not asked_question_users: return answer_read_objects = [] for user_id in asked_question_users: qar = QualityAnswerRead( quality_question_id=quality_question_id, user_id=user_id, is_read=False, ) answer_read_objects.append(qar) QualityAnswerRead.objects.bulk_create(answer_read_objects) return @shared_task def questioning_push(user_id, answer_id, quality_question_id, question_title): """追问push给作者。 :param user_id: 追问人user_id :param answer_id: 追问的回答id :param quality_question_id: 追问id :param question_title: 追问内容 :return: """ try: answer = Answer.objects.get(id=answer_id) except Answer.DoesNotExist: return # 每一个回答,一天作者只收到一次push cache_key_tpl = "question:{date}:{user_id}:{answer_id}" push_user_id = answer.user_id cache_key = cache_key_tpl.format(date=str(datetime.now().date()), user_id=push_user_id, answer_id=answer_id) if push_cache.get(cache_key): return # 作者对追问已经进行回复,这个追问(其他追问可以)不再产生push(时间无线) has_author_answer = QualityAuthorAnswer.objects.filter(quality_question_id=quality_question_id).exists() if has_author_answer: return user = UserService.get_user_by_user_id(user_id=user_id) content = question_title if len(question_title) <= 30 else question_title[:30] + "..." push_url = gm_protocol.get_user_answer_list() intelligent_push_task.apply_async( args=( answer_id, [push_user_id], AUTOMATED_PUSH.QUALITY_QUESTION, { 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'pushUrl': push_url, 'push_url': push_url, } ), kwargs={ "platform": None, "alert": content, "others": { "title": "@{} 希望向你请教".format(user.nickname), "alert": content, }, "labels": { 'event_type': 'push', 'event': 'quality_question_received_questioning' }, }, ) push_cache.set(cache_key, 1) today = datetime.today() push_cache.expire(cache_key, (today.replace(hour=23, minute=59, second=59) - today).seconds) @shared_task def answer_push(user_id, quality_question_id): """作者回复提问,push给追问用户""" try: quality_question = QualityQuestion.objects.get(id=quality_question_id, is_online=True) except QualityQuestion.DoesNotExist: return try: answer = Answer.objects.get(id=quality_question.answer_id, is_online=True) except Answer.DoesNotExist: return user_ids = list( QualityUserQuestion.objects.filter( quality_question_id=quality_question_id, is_online=True ).values_list("user_id", flat=True) ) if not user_ids: return # 校验同一追问是否推送过 cache_key_tpl = "answer:{date}:{answer_id}" cache_key = cache_key_tpl.format(date=str(datetime.now().date()), answer_id=answer.id) push_user_ids = [] for user_id, has_push in zip(user_ids, push_cache.hmget(cache_key, user_ids)): if not has_push: push_user_ids.append(user_id) if not push_user_ids: return user = UserService.get_user_by_user_id(user_id=user_id) content = _get_content_text(answer.content) content = content if len(content) <= 30 else content[:30] + "..." push_msg = "「{user_name}」回答了你在「{content}」下的提问,来看看她是怎么说的~".format(user_name=user.nickname, content=content) push_type = AUTOMATED_PUSH.QUALITY_QUESTION push_url = gm_protocol.get_user_answer_list() extra = { 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'msgType': 4, 'pushUrl': push_url, 'push_url': push_url, } push_task_to_user_multi( user_ids=push_user_ids, push_type=push_type, extra=extra, labels={'event_type': 'push', 'event': 'quality_question_received_answer'}, alert=push_msg, ) push_cache.hmset(cache_key, {user_id: 1 for user_id in push_user_ids}) today = datetime.today() push_cache.expire(cache_key, (today.replace(hour=23, minute=59, second=59) - today).seconds)