from talos.rpc import bind_context, bind from gm_types.error import ERROR as CODES from utils.common import convert_map_key2str from utils.rpc import gen, get_current_user from talos.services import ( get_user_from_context, ) from talos.cache.base import reply_cache from talos.tools.replies_tool import ReplyTool from talos.services import UserConvertService from qa.services import QualityQuestionService, QualityAnswerService from qa.tasks.quality_question_task import write_unread_data, answer_push @bind_context("qa/quality_question/create_answer") def quality_question_create_answer(ctx, quality_question_id, content, images): user = get_user_from_context(ctx) if not user: return gen(CODES.LOGIN_REQUIRED) quality_question = QualityQuestionService.quality_question_health_get(quality_question_id) answer_info = QualityAnswerService.create_answer(quality_question_id, user.id, content, images) quality_question.answer_cnt += 1 quality_question.save() # fill user info user_info = UserConvertService.get_user_info_by_user_id(answer_info["user"]["id"]) answer_info["user"].update(user_info) user_ids = QualityQuestionService.list_user_ids_by_id(quality_question_id) for user_id in user_ids: if user_id == user.id: continue rt = ReplyTool(reply_cache, user_id) rt.receive_quality_answer(answer_info["id"]) write_unread_data.delay(quality_question_id, _type='create_answer') answer_push.delay(user.id, quality_question_id) return answer_info @bind("qa/quality_question/list_quality_answer") def quality_question_list_answer(quality_question_ids, offset=0, count=2): """根据追问问题ID获取相关回答 Retturn: { quality_question_id: [ { "id": answer.id, "content": answer.content, "quality_question_id": answer.quality_question_id, "user": { "id": answers.user_id 'id': user.id, 'user_id': user.id, 'person_id': user.person_id, 'user_name': user.nickname, 'membership_level': user.membership_level, 'portrait': user.portrait, 'user_level': { 'membership_icon': user.membership_icon, 'level_icon': user.level_icon, 'constellation_icon': user.constellation_icon, }, }, "images": [ {"url": "", "width": 1, "height": 1} ] } ] } """ quality_question_answers = QualityAnswerService.answers_by_quality_question_ids( quality_question_ids, offset=offset, count=count ) quality_question_answers = convert_map_key2str(quality_question_answers) return quality_question_answers @bind("qa/quality_question/quality_answer_by_quality_question_id") def quality_question_answer_by_quality_question_id(quality_question_id, offset=0, count=2): """根据追问问题ID获取相关回答 Retturn: { "answers": [ answer_info, ], "rest_total": 1, } """ result = { "answers": [], "rest_total": 0, } quality_question_answers = QualityAnswerService.answers_by_quality_question_ids( [quality_question_id], offset=offset, count=count ) if not quality_question_answers: return result result["answers"] = quality_question_answers.get(quality_question_id, []) quality_question = QualityQuestionService.quality_question_health_get(quality_question_id) rest_total = quality_question.answer_cnt - (offset + count) if rest_total < 0: rest_total = 0 result["rest_total"] = rest_total return result @bind_context('qa/quality_question/unread_info') def get_my_page_qa_unread_count(ctx): """ 我的页面 收藏-回答/问题-回答 下消息红点 :param ctx: :return: """ result = { "favor_answer_unread_count": 0, "my_answer_unread_count": 0, } user = get_user_from_context(ctx) if not user: return result return QualityAnswerService.get_my_page_qa_unread_count(user_id=user.id) @bind_context('qa/quality_question/answer_read') def quality_question_answer_read(ctx): """ 我的页面-回答tab消息已读 :param ctx: :return: """ user = get_user_from_context(ctx) if not user: return gen(CODES.SUCCESS) QualityAnswerService.my_page_answer_tab_read(user_id=user.id) return gen(CODES.SUCCESS) @bind_context('qa/quality_question/favor_answer_read') def quality_question_favor_answer_read(ctx): """ 我的页面-收藏-回答tab消息已读 :param ctx: :return: """ user = get_user_from_context(ctx) if not user: return gen(CODES.SUCCESS) QualityAnswerService.my_page_favor_answer_tab_read(user_id=user.id) return gen(CODES.SUCCESS) @bind('qa/quality_question/answer_received') def quality_question_answer_received(answer_ids): user = get_current_user() if not user: return gen(CODES.LOGIN_REQUIRED) answers = QualityAnswerService.list_answer_by_ids(answer_ids) user_ids = [item["user_id"] for item in answers.values()] user_dict = UserConvertService.get_user_info_by_user_ids(user_ids) quality_question_ids = [item["quality_question_id"] for item in answers.values()] questions_dict = QualityQuestionService.get_question_info_by_quality_question_ids(quality_question_ids) for _id, item in answers.items(): reply_user = user_dict.get(item["user_id"]) if not reply_user: continue item["id"] = _id item["user"] = reply_user question = questions_dict.get(item["quality_question_id"]) if not question: continue item["replied_content"] = question.title item["replied_user"] = { "user_name": user.nick_name, "portrait": user.portrait, } item["create_time"] = item["create_time"].strftime("%Y-%m-%d %H:%M:%S") return { str(_id): answer for _id, answer in answers.items() }