quality_question_task.py 5.98 KB
from collections import namedtuple
from datetime import datetime

from celery import shared_task
from utils.rpc import (
    rpc_client,
    logging_exception,
)
from gm_types.push import PUSH_INFO_TYPE, AUTOMATED_PUSH

from talos.services import UserService
from talos.cache.gaia import push_cache
from utils.push import push_task_to_user_multi
from utils.protocol import gm_protocol
from qa.libs import _get_content_text
from qa.models import (
    Answer,
    QualityQuestionRead,
    QualityUserQuestion,
    QualityAnswerRead,
    QualityQuestion,
    QualityAuthorAnswer,
)
from communal.tasks import intelligent_push_task


@shared_task
def write_unread_data(quality_question_id, answer_id=0, _type=None):
    """
    用户提问/作者回复 写未读记录
    :param answer_id:
    :param quality_question_id:
    :param _type:
    :return:
    """
    nt = namedtuple('_TYPE', ['ASK_QUESTION', 'CREATE_ANSWER'])
    _TYPE = nt(ASK_QUESTION='ask_question', CREATE_ANSWER='create_answer')

    if not (_type and _type in _TYPE):
        return

    if _type == _TYPE.ASK_QUESTION:
        try:
            answer = Answer.objects.get(id=answer_id)
        except Answer.DoesNotExist:
            return
        QualityQuestionRead.objects.create(
            quality_question_id=quality_question_id,
            user_id=answer.user_id,
            is_read=False,
        )
        return

    elif _type == _TYPE.CREATE_ANSWER:
        asked_question_users = list(QualityUserQuestion.objects.filter(
            quality_question_id=quality_question_id, is_online=True
        ).values_list('user_id', flat=True))
        if not asked_question_users:
            return
        answer_read_objects = []
        for user_id in asked_question_users:
            qar = QualityAnswerRead(
                quality_question_id=quality_question_id,
                user_id=user_id,
                is_read=False,
            )
            answer_read_objects.append(qar)
        QualityAnswerRead.objects.bulk_create(answer_read_objects)
        return


@shared_task
def questioning_push(user_id, answer_id, quality_question_id, question_title):
    """追问push给作者。

    :param user_id: 追问人user_id
    :param answer_id: 追问的回答id
    :param quality_question_id: 追问id
    :param question_title: 追问内容
    :return:
    """

    try:
        answer = Answer.objects.get(id=answer_id)
    except Answer.DoesNotExist:
        return

    # 每一个回答,一天作者只收到一次push
    cache_key_tpl = "question:{date}:{user_id}:{answer_id}"
    push_user_id = answer.user_id
    cache_key = cache_key_tpl.format(date=str(datetime.now().date()), user_id=push_user_id, answer_id=answer_id)
    if push_cache.get(cache_key):
        return

    # 作者对追问已经进行回复,这个追问(其他追问可以)不再产生push(时间无线)
    has_author_answer = QualityAuthorAnswer.objects.filter(quality_question_id=quality_question_id).exists()
    if has_author_answer:
        return

    user = UserService.get_user_by_user_id(user_id=user_id)
    content = question_title if len(question_title) <= 30 else question_title[:30] + "..."
    push_url = gm_protocol.get_user_answer_list()
    intelligent_push_task.apply_async(
        args=(
            answer_id, [push_user_id], AUTOMATED_PUSH.QUALITY_QUESTION,
            {
                'type': PUSH_INFO_TYPE.GM_PROTOCOL,
                'pushUrl': push_url,
                'push_url': push_url,
            }
        ),
        kwargs={
            "platform": None,
            "alert": content,
            "others": {
                "title": "@{} 希望向你请教".format(user.nickname),
                "alert": content,
            },
            "labels": {
                'event_type': 'push',
                'event': 'quality_question_received_questioning'
            },
        },
    )

    push_cache.set(cache_key, 1)
    today = datetime.today()
    push_cache.expire(cache_key, (today.replace(hour=23, minute=59, second=59) - today).seconds)


@shared_task
def answer_push(user_id, quality_question_id):
    """作者回复提问,push给追问用户"""

    try:
        quality_question = QualityQuestion.objects.get(id=quality_question_id, is_online=True)
    except QualityQuestion.DoesNotExist:
        return

    try:
        answer = Answer.objects.get(id=quality_question.answer_id, is_online=True)
    except Answer.DoesNotExist:
        return

    user_ids = list(
        QualityUserQuestion.objects.filter(
            quality_question_id=quality_question_id, is_online=True
        ).values_list("user_id", flat=True)
    )
    if not user_ids:
        return

    # 校验同一追问是否推送过
    cache_key_tpl = "answer:{date}:{answer_id}"
    cache_key = cache_key_tpl.format(date=str(datetime.now().date()), answer_id=answer.id)

    push_user_ids = []
    for user_id, has_push in zip(user_ids, push_cache.hmget(cache_key, user_ids)):
        if not has_push:
            push_user_ids.append(user_id)

    if not push_user_ids:
        return

    user = UserService.get_user_by_user_id(user_id=user_id)
    content = _get_content_text(answer.content)
    content = content if len(content) <= 30 else content[:30] + "..."

    push_msg = "「{user_name}」回答了你在「{content}」下的提问,来看看她是怎么说的~".format(user_name=user.nickname, content=content)
    push_type = AUTOMATED_PUSH.QUALITY_QUESTION
    push_url = gm_protocol.get_user_answer_list()
    extra = {
        'type': PUSH_INFO_TYPE.GM_PROTOCOL,
        'msgType': 4,
        'pushUrl': push_url,
        'push_url': push_url,
    }

    push_task_to_user_multi(
        user_ids=push_user_ids,
        push_type=push_type,
        extra=extra,
        labels={'event_type': 'push', 'event': 'quality_question_received_answer'},
        alert=push_msg,
    )

    push_cache.hmset(cache_key, {user_id: 1 for user_id in push_user_ids})
    today = datetime.today()
    push_cache.expire(cache_key, (today.replace(hour=23, minute=59, second=59) - today).seconds)