# coding:utf8 from __future__ import unicode_literals, absolute_import, print_function import datetime import json import os import random from collections import defaultdict from itertools import chain from distutils.version import LooseVersion import time import math from multiprocessing import Pool, Manager from celery import shared_task from django.conf import settings from django import db from django.db.models import Q, Max from gm_types.gaia import TAG_TYPE, DIARY_CONTENT_LEVEL, DIARY_ORDER_TYPE, TAG_V3_TYPE from gm_types.push import PUSH_INFO_TYPE, PERSONAL_PUSH_TYPE, AUTOMATED_PUSH from gm_types.mimas import APPLET_PAGE_FROM, APPLET_SUBSCRIBE_MSG_TYPE from gm_protocol import GmProtocol from talos.tools.user_tool import get_username_with_title from utils.decorator import thread_local_router from utils.protocol import gm_protocol, gmdoctor_protocol from utils.rpc import rpc_client, logging_exception from utils.push import push_task_to_user_multi, limit_push_by_uids, doctor_noti_doctor_ids, push_time_range_limit from talos.cache.base import ( high_quality_user_cache, doctor_tags_cache, doctor_tagv3s_cache, hospital_topics_num_cache, ) from talos.cache.takesofacache import take_sofa_cache from talos.models.diary import Diary, DiaryExtra, DiaryTag, DiaryTagV3 from talos.services.doctor import DoctorService from talos.services.tag import TagService from talos.services.order import OrderService from talos.tools.topic_tool import filter_user_by_topic_to_push from talos.models.topic import TopicReply, TopicReplyVote from talos.services import UserService, TagV3Service from utils.push import send_applet_subscribe_msg from talos.rpc import get_current_rpc_invoker from communal.tasks import intelligent_push_task @shared_task def doctor_diary_tags(): """ 当前排序规则: 按照帖子案例数 从大到小 排序 7.6.25 新加需求 如果两个标签案例数最后一样多,就以日记帖子的总个数进行降序排序; """ def get_subitem_data(tag_list, tag_service, **kwargs): """ 获取二级标签项目 :param tag_list:标签列表 :param tag_service: 标签方法 :param kwargs: {"second_level": x, "third_level": x, "tns": xx} :return: """ result = [] tns = kwargs.get("tns", 0) second_level = kwargs.get("second_level", "") third_level = kwargs.get("third_level", "") items2 = list(filter(lambda tag: tag.tag_type == second_level, tag_list)) if items2: result.extend([{ "name": item.name, "tag_id": item.id, "tns": tns, } for item in items2]) else: subitems = list(filter(lambda tag: tag.tag_type == third_level, tag_list)) items3 = tag_service.get_closure_tags(tag_ids=[subitem.id for subitem in subitems]) result.extend([{ "name": item.name, "tag_id": item.id, "tns": tns, } for item in items3 if item.tag_type == second_level]) return result def convert_cache_info(item_list): """ 转换缓存信息 :param item_list: :return: """ name_dict = {} for item in item_list: name = item['name'] if name not in name_dict: # name_dict[name] = {'name': name, 'cnt': 1, 'tag_id': item['tag_id']} name_dict[name] = { 'name': name, 'cnt': 1, 'tag_id': item['tag_id'], "tns": item["tns"] } else: name_dict[name]['cnt'] += 1 name_dict[name]['tns'] += item["tns"] ordered_item_list = sorted(name_dict.values(), key=lambda x: (x["cnt"], x["tns"]), reverse=True) # 双倍缓存,因为返回数据不一样 cache_info = { 'item_list': [u'{}'.format(v['name']) for v in ordered_item_list], 'item_list_raw': [{ 'name': u'{} {}例'.format(v['name'], v['cnt']), 'tag_id': v['tag_id'], } for v in ordered_item_list], } return cache_info first_index = 0 gap = 1000 v1_prefix = doctor_tags_cache.prefix v3_prefix = doctor_tagv3s_cache.prefix while True: doctor_ids = DoctorService.get_doctor_ids(first_index, gap) first_index += gap if not doctor_ids: break v1_doctor_items, v3_doctor_items = dict(), dict() for d_id in doctor_ids: diarys = Diary.objects.filter( is_online=True, topics__isnull=False, topics__flag='n', topics__is_online=True, doctor_id=d_id, ).distinct() diary_ids = [diary.id for diary in diarys] diary_tag_v3_ids = DiaryTagV3.objects.filter(diary_id__in=diary_ids).values_list("diary_id", "tag_v3_id") diary_tag_ids = DiaryTag.objects.filter(diary_id__in=diary_ids).values_list("diary_id", "tag_id") _diary_rel_tag_v3_dic, _diary_rel_tag_dic = defaultdict(list), defaultdict(list) for diary_id, tag_v3_id in diary_tag_v3_ids: _diary_rel_tag_v3_dic[diary_id].append(tag_v3_id) for diary_id, tag_id in diary_tag_ids: _diary_rel_tag_dic[diary_id].append(tag_id) tag_v3_infos = TagV3Service.get_tags_by_tag_v3_ids(set(chain(*_diary_rel_tag_v3_dic.values()))) tag_infos = TagService.get_tags_dict_by_tag_ids(set(chain(*_diary_rel_tag_dic.values()))) v1_item_list_raw, v3_item_list_raw = [], [] for diary in diarys: tns = diary.topic_num # topic numbers 当前日记本下所有帖子的个数 v1_tags_list = [tag_infos[tag_id] for tag_id in _diary_rel_tag_dic.get(diary.id, []) if tag_id in tag_infos] v3_tags_list = [tag_v3_infos[tag_id] for tag_id in _diary_rel_tag_v3_dic.get(diary.id, []) if tag_id in tag_v3_infos] if v1_tags_list: v1_item_list_raw.extend(get_subitem_data(v1_tags_list, TagService, **{ "second_level": TAG_TYPE.BODY_PART_SUB_ITEM, "third_level": TAG_TYPE.ITEM_WIKI, "tns": tns })) if v3_tags_list: v3_item_list_raw.extend(get_subitem_data(v3_tags_list, TagV3Service, **{ "second_level": TAG_V3_TYPE.SECOND_CLASSIFY, "third_level": TAG_V3_TYPE.NORMAL, "tns": tns })) v1_doctor_items.update({ "{}:{}".format(v1_prefix, d_id): json.dumps(convert_cache_info(v1_item_list_raw)), }) v3_doctor_items.update({ "{}:{}".format(v3_prefix, d_id): json.dumps(convert_cache_info(v3_item_list_raw)), }) doctor_tags_cache.mset(**v1_doctor_items) doctor_tagv3s_cache.mset(**v3_doctor_items) @shared_task def take_sofa_diary(): last_index = 1000 diaries = Diary.objects.filter(is_online=True, reply_num=0).order_by('-id')[:last_index] for diary in diaries: if diary.topics.count() == 0: continue has_comments = False for topic in diary.topics.all(): if topic.reply_num: has_comments = True break if has_comments: continue create_time = time.mktime(diary.last_modified.timetuple()) take_sofa_cache.set_diary_cache_data(create_time, diary.id) take_sofa_cache.del_diary_all_cache(last_index, -1) @shared_task def get_index_diary_detail(): size = 1000 index = [] for i in range(int(size / 100)): _index = 100 * i id = rpc_client['api/diary/filter_diary'](offset=_index, size=100, sort_type=DIARY_ORDER_TYPE.INDEX).unwrap()['diary_ids'] index += id file_dir = settings.LOG_DIR + 'index_diary/' if not os.path.exists(file_dir): os.mkdir(file_dir) file_name = file_dir + 'score_detail.log' now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") index_diary_detail = _generate_diary_detail(index) with open(file_name, 'wb') as f: f.write(u'根据首页的排序\n'.encode('utf-8')) f.write(u'{}\n'.format(now).encode('utf-8')) _write_file(f, index_diary_detail) f.close() def _generate_diary_detail(diary_ids): diaries = [Diary.objects.get(pk=i) for i in diary_ids] detail = [] for d in diaries: try: username = d.user.last_name except: username = u'无' try: service_name = d.service.name except: service_name = u'无' info = (d.id, d.get_index_rank(), username, service_name) detail.append(info) return detail def _write_file(fd, values): log_template = u'日记本id: {}, 热度分: {}, 内容质量分: {}, 医生抽成分: {}, 用户名: {}, 美购名: {}\n' for value in values: id = value[0] heat_score, audit_score, choucheng_score = value[1] username, service_name = value[2], value[3] log = log_template.format(id, heat_score, audit_score, choucheng_score, username, service_name) fd.write(log.encode('utf-8')) @shared_task def default_diary_rating(): today = datetime.datetime.today() diarys = Diary.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True, rate_count=0) # 手术类订单验证后60天后,非手术类订单验证后30天 for diary in diarys: order = OrderService.get_order_from_order_id(diary.order_id) if not order: continue if not order.validated: continue if not order.validate_time: continue order_validate_time = datetime.datetime.strptime(order.validate_time, "%Y-%m-%d %H:%M:%S") if order.service_is_operation: before_60 = today - datetime.timedelta(days=60) if order_validate_time < before_60: diary.rating = 5.0 diary.operation_effect_rating = 5.0 diary.doctor_attitude_rating = 5.0 diary.hospital_env_rating = 5.0 diary.rate_count = 1 diary.save() else: before_30 = today - datetime.timedelta(days=30) if order_validate_time < before_30: diary.rating = 5.0 diary.operation_effect_rating = 5.0 diary.doctor_attitude_rating = 5.0 diary.hospital_env_rating = 5.0 diary.rate_count = 1 diary.save() @shared_task @thread_local_router(DB_FOR_READ_OVERRIDE=settings.SLAVE_DB_NAME) def create_high_quality_user_to_redis(): diaries = Diary.objects.filter(is_online=True, content_level=DIARY_CONTENT_LEVEL.EXCELLENT).values('user_id') _user_ids = [] for diary in diaries: _user_ids.append(diary['user_id']) _user_ids = set(_user_ids) _doctors = DoctorService.get_doctor_from_user_ids(list(_user_ids)) _doctor_user_ids = [int(d.user_id) for d in _doctors] user_ids = _user_ids - set(_doctor_user_ids) user_ids = [str(id) for id in user_ids] high_quality_user_cache.set('high_quality_user', ','.join(user_ids)) return user_ids @shared_task def remind_user_to_update_diary(): ''' 在用户更新日记本后的第3、7、15、21、30天,提醒用户更新 http://wiki.wanmeizhensuo.com/pages/viewpage.action?pageId=4430390 :return: ''' alert_dict = { #3: u'你的更美日记该更新啦!我们都很想你,分享一下你的最新动态吧!', 7: u'嘤嘤~亲爱的,日记该更新啦!分享一下最新美丽动态吧!', 15: u'您最近做的项目效果怎么样?来写篇日记分享一下吧!', 21: u'嗨,您的更美日记已经好久没更新了,有空来分享一下吧~', # 30: u'你的更美日记该更新啦!坚持记录自己的变美经历,你会为自己感到骄傲哒!', } today = datetime.datetime.today() thirty_days_before = today - datetime.timedelta(days=21) try: push_lst = filter_user_by_topic_to_push(start_time=thirty_days_before.strftime('%Y-%m-%d')) gm_protocol = GmProtocol() uid_dict_verify = {item['uid']: PERSONAL_PUSH_TYPE.DIARY_ALERT for item in push_lst if item['uid']} push_user_ids = [uid for uid in uid_dict_verify.items()] for item in push_lst: if item['uid'] in push_user_ids: # 根据created_time 区分文案, 所以没法批量调取push delta_days = datetime.datetime(today.year, today.month, today.day) - datetime.datetime( item['created_time'].year, item['created_time'].month, item['created_time'].day) if delta_days.days in alert_dict: extra = { 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'msgType': 4, 'pushUrl': gm_protocol.get_diary_detail(item['diary_id']), 'push_url': gm_protocol.get_diary_detail(item['diary_id']) } push_task_to_user_multi(user_ids=[item['uid']], extra=extra, push_type=AUTOMATED_PUSH.JOURNAL_UPDATE_PROMPT, alert=alert_dict[delta_days.days]) except: logging_exception() @shared_task def hospital_topics_num(): count = 10000 diaries_ids = [] hospital_diaries_dic = {} diary_extra_dict = {} def _update_diary_extra_dict(diaries_ids): diary_extra = DiaryExtra.objects.using(settings.SLAVE_DB_NAME).filter( diary_id__in=diaries_ids).values_list("diary_id", "topic_count").iterator() diary_extra_dict.update(dict(diary_extra)) diaries = Diary.objects.using(settings.SLAVE_DB_NAME).filter( hospital_id__isnull=False, is_online=True).values("id", "hospital_id").iterator() for diary in diaries: hospital_id = diary["hospital_id"] if hospital_id not in hospital_diaries_dic: hospital_diaries_dic[hospital_id] = [] hospital_diaries_dic[hospital_id].append(diary["id"]) diaries_ids.append(diary["id"]) if len(set(diaries_ids)) == count: _update_diary_extra_dict(list(set(diaries_ids))) diaries_ids = [] if len(set(diaries_ids)): _update_diary_extra_dict(list(set(diaries_ids))) for hid, diary_id_list in hospital_diaries_dic.items(): nums = sum(diary_extra_dict.get(diary_id, 0) for diary_id in diary_id_list) hospital_topics_num_cache.set(hid, nums) def update_pv(queue, limit): """更新DiaryExtra中的total_pv字段。 start_num, end_num 为根据id排序的起始偏移量和这次获取的数据总量。 """ start_id = queue.get() diary_extras = DiaryExtra.objects.filter(Q(diary__pk__gt=start_id))[: limit] max_id = diary_extras.aggregate(max_id=Max('diary_id')) queue.put(max_id["max_id"]) if not diary_extras: return for extra_info in diary_extras: diary = extra_info.diary extra_info = diary.extra_info extra_info.total_pv = diary.view_num extra_info.save() @shared_task def update_diary_pv(): """更新日记本的 pv 数。 每次去一定数量(per_num)的 diary 进行更新。其中日记本的pv数存储在redis中,另外 topic的pv数存储在数据库当中。 """ queue = Manager().Queue(maxsize=4) queue.put(0) # 触发程序开始 args_list = [] per_num = 500 count = DiaryExtra.objects.count() cnt = math.ceil(count / per_num) for _ in range(cnt): args_list.append((queue, per_num)) db.connections.close_all() pool = Pool(processes=4) pool.starmap(update_pv, args_list) pool.close() pool.join() def push_diary_reply_message( user_id: int, content: str, replied_reply_user_id: int, reply_id=None, topic_id=None, diary_user_id: int = None, topic_user_id: int = None) -> None: """ NOTE: 异步进行消息提醒 :param user_id: 评论者 :param content: 评论内容 :param topic_id: 日记帖id :param replied_reply_user_id: 一级评论作者 :param reply_id: 二级评论id :param diary_user_id: 日记本作者 :param topic_user_id: 日记帖作者 :return: """ try: if len(content) > 25: content = content[:25] + u'...' reply_username = get_username_with_title(user_id) msg = u'%s 回复你:%s' % (reply_username, content) receive_msg_user_ids = [] if replied_reply_user_id: if user_id != replied_reply_user_id: receive_msg_user_ids.append(replied_reply_user_id) else: if diary_user_id: if user_id != diary_user_id: receive_msg_user_ids.append(diary_user_id) else: # topic if user_id != topic_user_id: receive_msg_user_ids.append(topic_user_id) platform = ['android', 'iPhone'] # user_dict_to_verify = {uid: push_type for uid in set(receive_msg_user_ids)} # push_user_ids = limit_push_by_uids(user_dict_to_verify) # 用户推送,每日限制,得到可推的用户列表 if receive_msg_user_ids and topic_id: # 因为7715推送有版本兼容问题。 所以需要对推送者一个一个推 for _user_id in receive_msg_user_ids: # 1、2级评论统一走 demeter 服务防干扰push push_url = gm_protocol.get_user_answer_list() alert = u'@{}:{}...'.format(reply_username, content[:25]) if reply_id: push_type = AUTOMATED_PUSH.COMMENT_RECEIVED_REPLY content_id = reply_id else: push_type = AUTOMATED_PUSH.DIARY_POST_RECEIVED_COMMENTS content_id = topic_id # 帖子评论暂时没有支持图片,这个地方的push不加图片规则 intelligent_push_task.delay( content_id=content_id, user_ids=[_user_id], push_type=push_type, platform=None, extra={ 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'pushUrl': push_url, 'push_url': push_url, 'image': '', 'push_image': '', }, alert=alert, others={ "title": "你收到了一条新评论", "alert": alert, }, labels={'event_type': 'push', 'event': 'received_diary_comments'}, ) dortor_list = [] for receive_msg_user_id in set(receive_msg_user_ids): # if receiver is a doctor, push to doctor client try: recv_doctor = DoctorService.get_doctor_by_user_id(receive_msg_user_id) except IndexError: recv_doctor = None if recv_doctor: dortor_list.append(recv_doctor.id) if dortor_list: extra = { 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'pushUrl': gmdoctor_protocol.get_comment_list(), } doctor_noti_doctor_ids(doctor_ids=dortor_list, platform=platform, extra=extra, alert=msg) except: logging_exception() @shared_task def applet_diary_replied_push(topic_reply_id): """ 日记本的评论被评论,给日记本评论的用户发送小程序推送 自己给自己评论无效 :param topic_reply_id: 发出的那条回复ID :return: """ if not topic_reply_id: return try: reply_info = TopicReply.objects.get(id=topic_reply_id, is_online=True) except TopicReply.DoesNotExist: return if not reply_info.replied_topic_id: return try: replied_info = TopicReply.objects.get(id=reply_info.replied_topic_id) except TopicReply.DoesNotExist: return reply_content = reply_info.content[:20] nickname = UserService.get_user_by_user_id(reply_info.user_id).nickname[:10] data = { "name1": { "value": nickname }, "thing2": { "value": reply_content }, "date3": { "value": "{date}".format(date=datetime.datetime.now().strftime('%Y年%m月%d日 %H:%M')) }, "thing4": { "value": "点击快速查看>>" } } # 模板id template_id = settings.APPLET_SUBSCRIBE_MSG.get("comment", "") # 跳转到日记本详情页 TODO 确认日记本评论的key page = '/packageBBS/pages/diary/detail/detail?diary_id={diary_id}&topic_reply={topic_reply}&from={from_page}&from_action={from_action}'.format( diary_id=reply_info.diary_id, topic_reply=topic_reply_id, from_page=APPLET_PAGE_FROM.CARD, from_action=APPLET_SUBSCRIBE_MSG_TYPE.COMMENT ) # 给一级评论用户发 if reply_info.replied_topic_id != reply_info.commented_reply_id: try: level_1_reply = TopicReply.objects.get(id=reply_info.commented_reply_id) except: level_1_reply = None if level_1_reply: # 不用给自己发 if reply_info.user_id != level_1_reply.user_id: # 发送小程序推送 send_applet_subscribe_msg(level_1_reply.user_id, template_id, data=data, page=page) # 给被评论用户发 自己给自己评论无效 if reply_info.user_id == replied_info.user_id: return # 发送小程序推送 send_applet_subscribe_msg(replied_info.user_id, template_id, data=data, page=page) @shared_task def applet_diary_reply_summary_push(topic_reply_id): """ 用户评论完24小时后, 如果没有收到评论或点赞,则给用户推送日记本新增一级评论总数,或日记本的相关内容 :param reply_id: 当前评论id :return: """ if not topic_reply_id: return try: reply_info = TopicReply.objects.get(id=topic_reply_id) except TopicReply.DoesNotExist: return user_id = reply_info.user_id # 用户对日记本或日记帖的新增评论 new_topic_reply = TopicReply.objects.using(settings.SLAVE_DB_NAME).\ filter(id__gt=topic_reply_id, user_id=user_id, is_online=True).exists() if new_topic_reply: return replied_count = TopicReply.objects.using(settings.SLAVE_DB_NAME).\ filter(replied_topic_id=topic_reply_id, is_online=True).exclude(user_id=user_id).count() voted_count = TopicReplyVote.objects.using(settings.SLAVE_DB_NAME).filter(topic_reply_id=topic_reply_id).\ exclude(user_id=user_id).count() diary_title = reply_info.diary.title # 当前评论有被评论或被赞 if replied_count or voted_count: return diary_id = reply_info.diary_id user_id = reply_info.user_id # 日记本新增的一级评论数(不包含自己的) additional_reply_count = TopicReply.objects.using(settings.SLAVE_DB_NAME).\ filter(id__gt=topic_reply_id, diary_id=diary_id, commented_reply__isnull=True, is_online=True).\ exclude(user_id=user_id).count() # 有新增评论,发送24小内新增评论总数 if additional_reply_count: data = { "thing1": { "value": diary_title }, "thing2": { "value": "你评论的日记,新增{reply_count}条吐槽,立即查看".format(reply_count=additional_reply_count)[:20] } } # 模板id template_id = settings.APPLET_SUBSCRIBE_MSG.get("vote", "") # 跳转到日记本详情页 page = '/packageBBS/pages/diary/detail/detail?diary_id={diary_id}&from={from_page}&from_action={from_action}'.format( diary_id=diary_id, from_page=APPLET_PAGE_FROM.CARD, from_action=APPLET_SUBSCRIBE_MSG_TYPE.NEW_COMMENT ) # 无新增评论,推送评论的帖子的相同标签下的其它内容详情页 else: data = { "thing2": { "value": diary_title }, "thing4": { "value": "亲!你关注过的话题又有新内容啦>>" } } # 模板id template_id = settings.APPLET_SUBSCRIBE_MSG.get("recommend", "") # 获取日记本标签 tag_id_list = list(DiaryTag.objects.filter(diary_id=diary_id).values_list('tag_id', flat=True)) # 调策略 获取相关日记本 def get_new_diary_id(rpc_client, offset, tag_id_list): res = rpc_client['doris/search/query_filter_diary'](sort_type=DIARY_ORDER_TYPE.DEFAULT, filters={"tag_ids": tag_id_list}, size=1, offset=offset).unwrap() new_diary_id = res and res.get("diary_ids") and res.get("diary_ids")[0] return new_diary_id rpc_client = get_current_rpc_invoker() offset = random.randint(0, 200) origin_offset = offset new_diary_id = None num = 6 while not new_diary_id and num: try: new_diary_id = get_new_diary_id(rpc_client, offset, tag_id_list) except: new_diary_id = None offset = origin_offset % num if num > 1: offset += random.randint(0, 10) num -= 1 if not new_diary_id: return # 跳转到日记本详情页 page = '/packageBBS/pages/diary/detail/detail?diary_id={diary_id}&from={from_page}&from_action={from_action}'.format( diary_id=new_diary_id, from_page=APPLET_PAGE_FROM.CARD, from_action=APPLET_SUBSCRIBE_MSG_TYPE.RELATED_CONTENT ) # 发送小程序推送 send_applet_subscribe_msg(user_id, template_id, data=data, page=page)