from itertools import chain from gm_types.error import ERROR as CODES from qa.service import QaFavorServices from talos.rpc import bind_context, bind from utils.rpc import gen, get_current_user from utils.common import convert_map_key2str from talos.services import ( get_user_from_context, ) from talos.cache.base import reply_cache from talos.tools.replies_tool import ReplyTool from talos.services import UserConvertService from qa.services import QualityQuestionService from qa.models import Answer from qa.tasks.quality_question_task import write_unread_data, questioning_push @bind_context("qa/quality_question/create_question") def quality_question_create_question(ctx, answer_id, question_id): user = get_user_from_context(ctx) if not user: return gen(CODES.LOGIN_REQUIRED) question = QualityQuestionService.question_check_or_get(question_id) question_info = QualityQuestionService.questioning(answer_id, question, user.id) # 追问成功,默认收藏该回答 QaFavorServices.create(user_id=user.id, answer_id=answer_id) # fill user info user_info = UserConvertService.get_user_info_by_user_id(question_info["user"]["id"]) question_info["user"].update(user_info) answer = Answer.objects.filter(pk=answer_id).first() if answer and answer.user_id != user.id: rt = ReplyTool(reply_cache, answer.user_id) rt.receive_quality_question(question_info["user_question_id"]) write_unread_data.delay(question_info['id'], answer_id, _type='ask_question') questioning_push.delay(user.id, answer_id, question_info['id'], question.title) return question_info @bind("qa/quality_question/count_info_by_answer_ids") def quality_question_info_by_answer_ids(answer_ids): """获取相关统计信息。 Return: { answer_id: { "question_cnt": 0, "answer_cnt": 0, "reply_cnt": 0, } } """ ret = QualityQuestionService.get_quality_questions_cnt_info_by_answer_ids(answer_ids) ret = convert_map_key2str(ret) return ret @bind_context("qa/quality_question/vote") def quality_question_vote(ctx, quality_question_id): user = get_user_from_context(ctx) if not user: return gen(CODES.LOGIN_REQUIRED) quality_question = QualityQuestionService.quality_question_health_get(quality_question_id) QualityQuestionService.vote(user.id, quality_question_id) quality_question.vote_cnt += 1 quality_question.save() return quality_question.vote_cnt @bind_context("qa/quality_question/cancel_vote") def quality_question_cancel_vote(ctx, quality_question_id): user = get_user_from_context(ctx) if not user: return gen(CODES.LOGIN_REQUIRED) quality_question = QualityQuestionService.quality_question_health_get(quality_question_id) if QualityQuestionService.cancel_vote(user.id, quality_question_id): quality_question.vote_cnt -= 1 quality_question.save() return quality_question.vote_cnt @bind_context("qa/quality_question/list_question_by_tags") def quality_question_list_question_by_tags(ctx, tag_ids, answer_id=None): """ return { [question_info1, question_info2] } """ user = get_user_from_context(ctx) tag_question_ids_map = QualityQuestionService.list_question_by_tags(tag_ids) # 取ID最小有数据标签就可以了 question_ids = [] for tag_id in sorted(tag_ids): if tag_id in tag_question_ids_map: question_ids = tag_question_ids_map[tag_id] break questions_dict = QualityQuestionService.list_question_by_ids(question_ids) if questions_dict: questions_info = [questions_dict[question_id] for question_id in question_ids if question_id in questions_dict] else: questions_info = QualityQuestionService.list_common_question() question_ids = [item["id"] for item in questions_info] user_question_ids = [] if user: user_question_ids = QualityQuestionService.user_question_ids(user.id, answer_id, question_ids) for question in questions_info: if question["id"] in user_question_ids: question["has_question"] = True else: question["has_question"] = False return questions_info @bind_context("qa/quality_question/list_quality_question_by_quality_question_ids") def list_quality_question_by_quality_question_ids(ctx, quality_question_ids): """根据追问ID获取相关追问信息。 Retturn: { quality_question_id: { "id": quality_question.id, "question_id": quality_question.question_id, "ask_cnt": quality_question.ask_cnt, "vote_cnt": quality_question.vote_cnt, "title": title, "is_voted": True/False, } } """ user = get_user_from_context(ctx) questions = QualityQuestionService.list_quality_questions_info(quality_question_ids, user) questions = convert_map_key2str(questions) return questions @bind("qa/quality_question/quality_question_ids_by_answer_ids") def quality_question_ids_by_answer_ids(answer_ids, viewer=None, offset=0, count=2): """根据回答ID以及当前用户获取相关回答下面的追问id列表。 Retturn: { answer_id: [quality_question_id1, quality_question_id2, quality_question_id3], } """ questions = QualityQuestionService.quality_question_ids_by_answer_ids( answer_ids, user_id=viewer, offset=offset, count=count) questions = convert_map_key2str(questions) return questions @bind("qa/quality_question/quality_question_ids_by_answer_id") def quality_question_ids_by_answer_id(answer_id, viewer=None, offset=0, count=2): """根据回答ID以及当前用户获取相关回答下面的追问id列表。 Retturn: { "total": 10, answer_id: [quality_question_id1, quality_question_id2, quality_question_id3], } """ questions = QualityQuestionService.quality_question_ids_by_answer_ids( [answer_id], user_id=viewer, offset=offset, count=count) questions = convert_map_key2str(questions) questions["total"] = QualityQuestionService.count_by_answer_id(answer_id) return questions @bind("qa/quality_question/question_received") def quality_question_received(question_ids): if not question_ids: return {} user = get_current_user() if not user: return gen(CODES.LOGIN_REQUIRED) questions_dict = QualityQuestionService.list_user_question_by_user_quality_ids(question_ids) answer_ids = [item["answer_id"] for _, item in questions_dict.items()] answers_dict = Answer.objects.filter(pk__in=answer_ids).in_bulk(answer_ids) quality_question_ids = [item["quality_question_id"] for _, item in questions_dict.items()] questions = QualityQuestionService.get_question_info_by_quality_question_ids(quality_question_ids) user_ids = [user.user_id] user_ids.extend([item["user_id"] for _, item in questions_dict.items()]) user_ids.extend([item.user_id for _, item in answers_dict.items()]) user_dict = UserConvertService.get_user_info_by_user_ids(user_ids) result = {} for _id, item in questions_dict.items(): question = questions.get(item["quality_question_id"]) info = { "id": _id, "answer_id": item["answer_id"], "create_time": item["create_time"].strftime("%Y-%m-%d %H:%M:%S"), "user": user_dict.get(item["user_id"], {}), "content": question.title if question else "", "replied_user": user_dict.get(user.user_id, {}), "replied_content": answers_dict.get(item["answer_id"]) and answers_dict[item["answer_id"]].content, } result[str(_id)] = info return result