#!/usr/bin/env python # -*- coding: utf-8 -*- ''' __title__ = '' __author__ = 'xierong@gmei.com' __mtime__ = '17/12/22' ''' from __future__ import unicode_literals, absolute_import, print_function import datetime import json import random import re from urllib.request import urlopen from datetime import timedelta from bs4 import BeautifulSoup from celery import shared_task from django.conf import settings from django.db.models import Count from django.utils import timezone from gm_types.gaia import TOPIC_TYPE, DIARY_ORDER_TYPE from gm_types.push import PUSH_INFO_TYPE, AUTOMATED_PUSH from gm_types.mimas import APPLET_PAGE_FROM, APPLET_SUBSCRIBE_MSG_TYPE from gm_protocol import GmProtocol from gm_upload import upload, IMG_TYPE from gm_upload.utils.image_utils import Picture from utils.rpc import rpc_client, logging_exception from utils.push import push_task_to_user_multi, special_push_limit from utils.wx import WxTkApi from utils.push import send_applet_subscribe_msg from qa.models.answer import Question, Answer, QuestionImage, AnswerImage, QuestionTag, AnswerReply from talos.cache.base import wechat_cache from talos.services import UserService from talos.models.topic import ( Problem, ProblemTag, TopicReply, TopicReplyVote, ) from talos.models.doctor import DoctorMessageList from talos.models.topic import TopicVote from talos.models.topic import WechatMaterial from .vote import fake_vote from communal.normal_manager import ( tag_manager, ) from talos.rpc import get_current_rpc_invoker from qa.libs import _get_content_text # @shared_task # def hot_in_24hrs(): # query = TopicVote.objects.filter( # vote_time__range=(timezone.now() - timedelta(hours=24), timezone.now()) # ) # query = query.values('topic') # result = query.annotate(vote_num=Count('topic')).order_by('-vote_num') # # data = [] # # user_ids = set() # start = 0 # count = 20 # # while True: # result = result[start:start + count] # if not result: # break # # start += count # topic_ids = [d['topic'] for d in result] # topics = Problem.objects.filter( # pk__in=topic_ids, is_online=True, is_sink=False) # # for t in topics: # if t.user.id in user_ids: # continue # user_ids.add(t.user.id) # data.append(t.id) # # if len(user_ids) >= count: # break # # # save data in redis # hot_in_24hrs_cache.set('topics', json.dumps(data)) # return data @shared_task def push_diary_update_to_follow(diary_id, user_id, diary_title=None): """ 用户发日记贴 推送给粉丝 :param diary_id: :param user_id: :return: """ if str(user_id) == '22': return try: user_name = UserService.get_user_by_user_id(user_id).nickname except: logging_exception() return if user_name.startswith(u'更美用户'): user_name = u'匿名美人' try: user_ids = rpc_client['api/user/get_all_fans'](user_id=user_id).unwrap() #user_ids = [uid for uid in user_ids if special_push_limit(uid, "user_released_diary", user_id)] except: logging_exception() return kwargs = { "user_ids": user_ids, "platform": ['android', 'iPhone'], "alert": u'嗨,你关注的 @{user_name}刚刚在《{diary_title}》中更新了一篇日记。快去看看 ta 变美了多少~'.format(user_name=user_name, diary_title=diary_title), "extra": { 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'msgType': 4, 'pushUrl': GmProtocol().get_diary_detail(id=diary_id), 'push_url': GmProtocol().get_diary_detail(id=diary_id), }, "push_type": AUTOMATED_PUSH.FOLLOWED_USER_POSTED_JOURNAL_POSTS, } push_task_to_user_multi(**kwargs) @shared_task def fake_vote_to_topic(): now = datetime.datetime.now() time_delta = datetime.timedelta(days=30) start_date = now.date() - time_delta topics = Problem.objects.using(settings.SLAVE_DB_NAME).filter(last_modified__gte=start_date, votes__isnull=True).values_list('id', flat=True) count = topics.count() topics = list(topics) topics = Problem.objects.filter(id__in=topics) topics_index = random.sample(range(0, count), min(20, count)) for index in topics_index: times = random.randint(1, 3) fake_vote(times, topics[index].id) @shared_task def move_topic_to_answer(): # 非日记帖 同步成问答 num = 100 question_obj = Question.objects.filter(problem_id__isnull=False).order_by('-problem_id').first() if question_obj: max_problem_id = question_obj.problem_id all_problem = Problem.objects.filter(is_online=True, topic_type__in=[TOPIC_TYPE.ASK, TOPIC_TYPE.TOPIC], id__gt=max_problem_id) problems, index = all_problem[0:num], 0 while problems: for problem in problems: convert_question_topic(problem) index += 1 problems = all_problem[index * num: (index + 1) * num] def convert_question_topic(problem): try: problem.stream return except: pass try: content = problem.ask + problem.answer title = re.split("\?|?", content)[0] q = Question.objects.create(user_id=problem.user_id, problem_id=problem.id, title=title[0:100], content=content, create_time=problem.created_time, update_time=problem.created_time) for tag in problem.tags.all(): QuestionTag(question=q, tag=tag.id).save() for image in problem.images.all(): QuestionImage.objects.create(question=q, image_url=image.image_url) print('finish problem id: ' + str(problem.id)) convert_answer_topic_reply(question=q, problem=problem) except Exception as e: print(e) print('error: problem id: ' + str(problem.id)) def convert_answer_topic_reply(question, problem): replys = problem.topicreply_set.filter(is_online=True, commented_reply__isnull=True) for reply in replys: try: answer = Answer.objects.create(user_id=reply.user_id, topicreply_id=reply.id, question=question, content=reply.content, like_num=reply.like_num, create_time=reply.reply_date, update_time=reply.reply_date) for image in reply.images.all(): AnswerImage.objects.create(image_url=image.image_url, answer=answer) print('finish reply id: ' + str(reply.id)) convert_answer_reply_topic_reply(answer, reply) except Exception as e: print(e) print('error: reply id: ' + reply.id) def convert_answer_reply_topic_reply(answer, topicreply): topicreplys = topicreply.comments.filter(is_online=True) for reply in topicreplys: try: answer_reply = AnswerReply.objects.create(content=reply.content, user_id=reply.user_id, answer=answer, create_time=reply.reply_date, update_time=reply.reply_date, topicreply_id=reply.id, like_num=reply.like_num) print('finish reply id: ' + str(reply.id)) except Exception as e: print(e) print('error: reply id ' + reply.id) @shared_task def get_wechat_pgc(offset=0, size=10, run_all=False): wx_client = WxTkApi() access_token = wx_client.get_access_token() wechat_result = wx_client.get_material_list( access_token=access_token, offset=offset, count=size ) material_list = json.loads(wechat_result) create_wechat_ugc(material_list) # get the total count total_count = material_list['total_count'] if not run_all: total_count = offset + 20 while offset < total_count: offset += size access_token = wx_client.get_access_token() wechat_result = wx_client.get_material_list( access_token=access_token, offset=offset, count=size) material_list = json.loads(wechat_result) if not material_list: break create_wechat_ugc(material_list) def create_wechat_ugc(material_list): if not material_list: return user = UserService.get_user_by_user_id(settings.WX_USER_ID) for material in material_list['item']: created_time = datetime.datetime.fromtimestamp(material['update_time']) i = 0 media_id = material['media_id'] for item in material['content']['news_item']: # checkout if material already downloaded material_gmei, result = WechatMaterial.objects.get_or_create(material_id=media_id, number=i) if (result is False and material_gmei.topic_id and material_gmei.update_time == created_time): continue item_content = str(item['content'], "utf-8") if type(item["content"]) == bytes else item["content"] # 数据类型 material_gmei.thumb_media_id = item['thumb_media_id'] material_gmei.thumb_url = item['thumb_url'] material_gmei.show_cover_pic = item['show_cover_pic'] material_gmei.author = item['author'] material_gmei.digest = item['digest'] material_gmei.content = item_content material_gmei.url = item['url'] material_gmei.content_source_url = item['content_source_url'] material_gmei.update_time = created_time material_gmei.title = item['title'] title = item['title'] content = u"{content}".format(content=item_content) soup = BeautifulSoup(content, 'lxml') images = soup.find_all("img") problem, result = Problem.objects.get_or_create( user_id=user.id, ask=title, topic_type=TOPIC_TYPE.WEIXIN_NUM, audit_status=False, is_online=False ) try: for image in images: url = image.attrs['data-src'] if not url: continue # if src has value then continue if image.attrs.get('src', ''): continue if not url: continue data = urlopen(url).read() image_url = upload(image_file=data, img_type=IMG_TYPE.NOWATERMARK) image.attrs['src'] = Picture.get_w_path(image_url) except: logging_exception() # push error material id into the rerun queue redis_key = "wechat_rerun_queue" wechat_cache.lpush(redis_key, problem.id) html_content = str(soup.body).replace("", "").replace("", "") problem.answer = html_content problem.created_time = created_time problem.save() material_gmei.topic_id = problem.id material_gmei.save() i += 1 @shared_task def rerun_pgc(): redis_key = "wechat_rerun_queue" limit = 10 problem_ids = [] while True: for i in range(0, limit): problem_id = wechat_cache.rpop(redis_key) if problem_id: problem_ids.append(problem_id) if not problem_ids: break problems = Problem.objects.filter(id__in=problem_ids) for problem in problems: rerun(problem) def rerun(problem): if not isinstance(problem, Problem): return content = u"{content}".format(content=problem.answer) soup = BeautifulSoup(content, 'lxml') images = soup.find_all("img") try: for image in images: if image.attrs.get('src'): continue url = image.attrs['data-src'] if not url: continue data = urlopen(url).read() image_url = upload(image_file=data, img_type=IMG_TYPE.NOWATERMARK) image.attrs['src'] = Picture.get_w_path(image_url) html_content = str(soup.body).replace("", "").replace("", "") problem.answer = html_content problem.save() except: logging_exception() # push error material id into the rerun queue redis_key = "wechat_rerun_queue" wechat_cache.lpush(redis_key, problem.id) @shared_task def async_diary_change_tags_to_topic(topic_ids, add_tags, del_tags): """ 将日记本变动的标签,同步到帖子上 :param topic_ids: :param add_tags: :param del_tags: :return: """ # 更新需要创建的标签 exist_topic_tag_group = list( ProblemTag.objects.filter(problem_id__in=topic_ids, tag_id__in=add_tags).values_list("problem_id", "tag_id")) topic_tag_group = [(topic_id, tag_id) for tag_id in add_tags for topic_id in topic_ids] can_create_list = list(filter(lambda x: x not in exist_topic_tag_group, topic_tag_group)) ProblemTag.objects.bulk_create( [ProblemTag(problem_id=topic_id, tag_id=tag_id) for topic_id, tag_id in can_create_list]) # 删掉需要剔除的标签 ProblemTag.objects.filter(problem_id__in=topic_ids, tag_id__in=del_tags).delete() @shared_task def add_doctor_topic(topic_ids, user_id): """添加医生关联的日记贴""" topics = Problem.objects.filter(id__in=topic_ids, is_online=True).select_related("diary") if not topics: return service_id = topics[0].diary.service_id if not service_id: return try: doctors = rpc_client["api/doctor/list_by_service_ids"](service_ids=[service_id]).unwrap() doctor_id = doctors.get(str(service_id), {}).get("id") except: logging_exception() return if not doctor_id: return messages = [] for topic in topics: messages.append(DoctorMessageList( user_id=user_id, doctor_id=doctor_id, topic=topic, diary=topic.diary )) DoctorMessageList.objects.bulk_create(messages) @shared_task def applet_topic_replied_push(topic_reply_id): """ 日记帖的评论被评论,给日记帖评论的用户发送小程序推送 自己给自己评论无效 :param topic_reply_id: 发出的那条回复ID :return: """ if not topic_reply_id: return try: reply_info = TopicReply.objects.get(id=topic_reply_id, is_online=True) except TopicReply.DoesNotExist: return if not reply_info.replied_topic_id: return try: replied_info = TopicReply.objects.get(id=reply_info.replied_topic_id) except TopicReply.DoesNotExist: return reply_content = reply_info.content[:20] nickname = UserService.get_user_by_user_id(reply_info.user_id).nickname[:10] data = { "name1": { "value": nickname }, "thing2": { "value": reply_content }, "date3": { "value": "{date}".format(date=datetime.datetime.now().strftime('%Y年%m月%d日 %H:%M')) }, "thing4": { "value": "点击快速查看>>" } } # 模板id template_id = settings.APPLET_SUBSCRIBE_MSG.get("comment", "") # 跳转页面 page = '/packageBBS/pages/topic/detail/detail?topic_id={topic_id}&comment_id={comment_id}' \ '&from={from_page}&from_action={from_action}'.format( topic_id=reply_info.problem_id, comment_id=reply_info.id, from_page=APPLET_PAGE_FROM.CARD, from_action=APPLET_SUBSCRIBE_MSG_TYPE.COMMENT ) # 给一级评论用户发 if reply_info.replied_topic_id != reply_info.commented_reply_id: try: level_1_reply = TopicReply.objects.get(id=reply_info.commented_reply_id) except: level_1_reply = None if level_1_reply: # 不用给自己发 if reply_info.user_id != level_1_reply.user_id: # 发送小程序推送 send_applet_subscribe_msg(level_1_reply.user_id, template_id, data=data, page=page) # 给被评论用户发 自己给自己评论无效 if reply_info.user_id == replied_info.user_id: return # 发送小程序推送 send_applet_subscribe_msg(replied_info.user_id, template_id, data=data, page=page) @shared_task def applet_topic_reply_summary_push(topic_reply_id): """ 用户评论完24小时后, 如果没有收到评论或点赞,则给用户推送日记帖新增一级评论总数,或帖子的相关内容 如果被赞,且被赞数大于1,则推送新增被赞数 :param reply_id: 当前评论id :return: """ if not topic_reply_id: return try: reply_info = TopicReply.objects.get(id=topic_reply_id) except TopicReply.DoesNotExist: return user_id = reply_info.user_id # 用户对日记帖或日记本的新增评论 new_topic_reply = TopicReply.objects.using(settings.SLAVE_DB_NAME).\ filter(id__gt=topic_reply_id, user_id=user_id, is_online=True).exists() if new_topic_reply: return replied_count = TopicReply.objects.using(settings.SLAVE_DB_NAME).\ filter(replied_topic_id=topic_reply_id, is_online=True).exclude(user_id=user_id).count() voted_count = TopicReplyVote.objects.using(settings.SLAVE_DB_NAME).filter(topic_reply_id=topic_reply_id).\ exclude(user_id=user_id).count() problem_content = Problem.objects.filter(id=reply_info.problem_id).first().answer problem_content = _get_content_text(problem_content)[:10] # 当前评论有被评论或被赞 if replied_count or voted_count: return problem_id = reply_info.problem_id user_id = reply_info.user_id # 日记帖帖子新增的一级评论数(不包含自己的) additional_reply_count = TopicReply.objects.using(settings.SLAVE_DB_NAME).\ filter(id__gt=topic_reply_id, problem_id=problem_id, commented_reply__isnull=True, is_online=True).\ exclude(user_id=user_id).count() # 有新增评论,发送24小内新增评论总数 if additional_reply_count: data = { "thing1": { "value": problem_content }, "thing2": { "value": "你评论的帖子,新增{reply_count}条吐槽,立即查看".format(reply_count=additional_reply_count)[:20] } } # 模板id template_id = settings.APPLET_SUBSCRIBE_MSG.get("vote", "") # 跳转页面 page = '/packageBBS/pages/topic/detail/detail?topic_id={topic_id}' \ '&from={from_page}&from_action={from_action}'.format( topic_id=reply_info.problem_id, from_page=APPLET_PAGE_FROM.CARD, from_action=APPLET_SUBSCRIBE_MSG_TYPE.NEW_COMMENT ) # 无新增评论,推送评论的帖子的相同标签下的其它内容详情页 else: data = { "thing2": { "value": problem_content }, "thing4": { "value": "亲!你关注过的话题又有新内容啦>>" } } # 模板id template_id = settings.APPLET_SUBSCRIBE_MSG.get("recommend", "") # 获取日记帖标签 tag_id_list = list(ProblemTag.objects.filter(problem_id=problem_id).values_list('tag_id', flat=True)) # 调策略 获取相关日记本 def get_new_diary_id(rpc_client, offset, tag_id_list): res = rpc_client['doris/search/query_filter_diary'](sort_type=DIARY_ORDER_TYPE.DEFAULT, filters={"tag_ids": tag_id_list}, size=1, offset=offset).unwrap() new_diary_id = res and res.get("diary_ids") and res.get("diary_ids")[0] return new_diary_id rpc_client = get_current_rpc_invoker() offset = random.randint(0, 200) origin_offset = offset new_diary_id = None num = 6 while not new_diary_id and num: try: new_diary_id = get_new_diary_id(rpc_client, offset, tag_id_list) except: new_diary_id = None offset = origin_offset % num if num > 1: offset += random.randint(0, 10) num -= 1 if not new_diary_id: return # 跳转到日记本详情页 page = '/packageBBS/pages/diary/detail/detail?diary_id={diary_id}&from={from_page}&from_action={from_action}'.format( diary_id=new_diary_id, from_page=APPLET_PAGE_FROM.CARD, from_action=APPLET_SUBSCRIBE_MSG_TYPE.RELATED_CONTENT ) # 发送小程序推送 send_applet_subscribe_msg(user_id, template_id, data=data, page=page) @shared_task def topic_reply_voted_applet_push(reply_vote_id): """ 日记帖的评论被点赞,给日记帖评论的用户发送小程序推送 自己给自己点赞无效 超过1赞后不发 :param reply_vote: 评论点赞id :return: """ if not reply_vote_id: return try: reply_vote_info = TopicReplyVote.objects.get(id=reply_vote_id) except TopicReplyVote.DoesNotExist: return try: reply_info = TopicReply.objects.get(id=reply_vote_info.topic_reply_id, is_online=True) except TopicReply.DoesNotExist: return # 自己给自己点赞不用发 if reply_vote_info.user_id == reply_info.user_id: return if reply_info.problem_id: problem_content = Problem.objects.filter(id=reply_info.problem_id).first().answer push_content = _get_content_text(problem_content)[:10] # 跳转页面 page = '/packageBBS/pages/topic/detail/detail?topic_id={problem_id}&comment_id={comment_id}&rom={from_page}&' \ 'from_action={from_action}'.format( problem_id=reply_info.problem_id, comment_id=reply_info.id, from_page=APPLET_PAGE_FROM.CARD, from_action=APPLET_SUBSCRIBE_MSG_TYPE.VOTE ) else: # 日记本评论功能 APP以去除 return # push_content = reply_info.diary.title[:10] # # 跳转到日记本详情页 TODO 确认日记本评论的key # page = '/packageBBS/pages/diary/detail/detail?diary_id={diary_id}&topic_reply={topic_reply}&from={from_page}&from_action={from_action}'.format( # diary_id=reply_info.diary_id, # topic_reply=reply_info.id, # from_page=APPLET_PAGE_FROM.CARD, # from_action=APPLET_SUBSCRIBE_MSG_TYPE.VOTE # ) data = { "thing1": { "value": push_content }, "thing2": { "value": "你的评论收到了新的支持,快来看看吧>>" } } # 模板id template_id = settings.APPLET_SUBSCRIBE_MSG.get("vote", "") # 发送小程序推送 send_applet_subscribe_msg(reply_info.user_id, template_id, data=data, page=page)