update_replies_by_org.py 5.26 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import math
import time
from multiprocessing import Pool, Manager

from django.core.management.base import BaseCommand
from django.db.models import Q, Max
from django import db

from settings import settings
from talos.models.diary import Diary, DiaryExtra
from talos.models.topic.topic import Problem as Topic
from talos.models.topic.topicreply import TopicReply
from talos.tools.hospital_tool import HospitalTool


SQL_LIMIT = 500
PROCESS_NUM = 4


def get_replies(model, reply_model, queue, limit):
    # 获取回复
    start_id = queue.get()
    if start_id == 'end':
        queue.put('end')
        return

    if start_id is None:
        queue.put('end')
        return

    topic_list = model.objects.using(settings.SLAVE_DB_NAME).filter(
        Q(diary__isnull=False, id__gt=start_id, is_online=True))[:limit]
    if not topic_list:
        return
    max_id = topic_list.aggregate(max_id=Max('id'))
    queue.put(max_id["max_id"])
    print('current start_id is : {}'.format(max_id["max_id"]))

    need_update_replies = []
    for topic in topic_list:
        q = Q()
        if isinstance(topic, Topic):
            q = Q(is_online=True, problem_id=topic.id)

        topic_replies = reply_model.objects.using(settings.SLAVE_DB_NAME).filter(q)

        need_update_reply_list = [reply for reply in topic_replies
                                  if not HospitalTool.in_one_hospital(topic, reply.user_id)]
        need_update_replies.extend(need_update_reply_list)
    return need_update_replies


def update_topic_reply_off_line(topic_reply_list, model):
    if not topic_reply_list:
        return
    for reply in topic_reply_list:
        if not isinstance(reply, model):
            continue
        try:
            if not reply.is_online:
                continue
            # 主从切换
            reply = TopicReply.get_by_id(reply.id)

            change_num = 1
            if not reply.commented_reply:
                change_num += TopicReply.objects.using(settings.SLAVE_DB_NAME).filter(
                    commented_reply=reply, is_online=True).count()

                second_replies = TopicReply.objects.using(settings.SLAVE_DB_NAME).filter(
                    commented_reply=reply, is_online=True)
                if second_replies:
                    TopicReply.objects.filter(id__in=list(second_replies.values_list("id", flat=True))).update(
                        is_online=False)
            else:
                reply_topics = TopicReply.objects.using(settings.SLAVE_DB_NAME).filter(
                    replied_topic_id=reply.id, is_online=True)
                if reply_topics:
                    change_num += TopicReply.objects.using(settings.SLAVE_DB_NAME).filter(
                        replied_topic_id=reply.id, is_online=True).count()
                    TopicReply.objects.filter(id__in=list(reply_topics.values_list("id", flat=True))).update(
                        is_online=False)

            reply.is_online = False
            reply.save(update_fields=["is_online"])

            topic = Topic.get_by_id(reply.problem_id)
            if not topic:
                continue

            topic.reply_num -= change_num
            topic.save(update_fields=["reply_num"])

            if not topic.diary_id:
                continue
            try:
                # 非正常数据兼容
                diary = Diary.objects.get(pk=topic.diary_id)
                if diary:
                    diary.reply_num -= change_num
                    diary.save(update_fields=["reply_num"])
                    try:
                        diary_extra = DiaryExtra.objects.get(diary_id=diary.id)
                        if diary_extra and diary_extra.reply_count != diary.reply_num:
                            diary_extra.reply_count = diary.reply_num
                            diary_extra.save(update_fields=["reply_count"])
                    except Exception as e:
                        pass
            except Exception as e:
                pass

        except Exception as e:
            print('{} update topic reply failed, id is : '.format(
                model.__name__), reply.id)
            print(e)


def process_topic(queue_topic, limit):
    # get topic replies
    topic_replies = get_replies(model=Topic, reply_model=TopicReply, queue=queue_topic, limit=limit)
    update_topic_reply_off_line(topic_reply_list=topic_replies, model=TopicReply)


def create_process_pool(model, func):
    start_id = input('please input the start id: ')
    queue = Manager().Queue(maxsize=PROCESS_NUM)
    queue.put(start_id)

    args_list = []
    count = model.objects.filter(id__gt=start_id).count()

    cnt = math.ceil(count / SQL_LIMIT)
    print('   cnt :', cnt)

    for _ in range(cnt):
        args_list.append((queue, SQL_LIMIT))

    # print(args_list)
    db.connections.close_all()
    pool = Pool(processes=PROCESS_NUM)
    pool.starmap(func, args_list)

    pool.close()
    pool.join()


class Command(BaseCommand):

    def handle(self, *args, **options):

        print("BEGIN")
        start_time = time.time()
        print("start at: ", start_time)

        create_process_pool(Topic, process_topic)

        end_time = time.time()
        print("end at: ", end_time)
        print('total use {} s.'.format(end_time - start_time))
        print("END")