# coding:utf-8 """导出用户术前术后图片统计""" import time import math from datetime import datetime from multiprocessing import Pool, Queue, Manager from django.core.management.base import BaseCommand from django.db.models import Q, Count from django.conf import settings from django import db from talos.models.diary.diaryextra import DiaryExtra from talos.models.topic import Problem, TopicImage from talos.models.diary.preoperationimage import PreOperationImage from utils.execel import ExcelWriter, ExcelReader res_file = 'out.xlsx' page_num = 200 processes = 5 class Command(BaseCommand): help = '导出用户术前术后图片统计' def handle(self, *args, **options): print('------ starting -----') start_time = time.time() print("start at: ", start_time) excel = ExcelWriter(res_file) excel.write_header(["日记本ID", "术前图", "术后图"]) excel.save() queue = Manager().Queue(maxsize=1) # 队列保证写入excel顺序 lock = Manager().Lock() queue.put(0) # pong 开始 count = get_user_diaries_count() print(count) pages = math.ceil(count / page_num) print("pages = ", pages) args_list = [] for page in range(int(pages)): start = page * page_num diaries_ids = get_user_diaries(start) args_list.append((queue, lock, diaries_ids, start)) db.connections.close_all() pool = Pool(processes=processes) pool.starmap(handle_result, args_list) pool.close() pool.join() end_time = time.time() print("end at: ", end_time) print('total use {} s.'.format(end_time - start_time)) print('Done!') def handle_result(queue, lock, diaries_ids, start): """处理结果,写入到文件中""" rows = [] print(int(start / page_num)) diary_img_count = get_diary_img_count(diaries_ids) for diary_id in diaries_ids: try: item = diary_img_count[diary_id] rows.append([diary_id, item.get("before", 0), item.get("after", 0)]) except: pass while True: lock.acquire() pre = queue.get() if pre == start: excel = ExcelReader(res_file) excel.write_rows(excel.row_number + 1, rows) excel.save() queue.put(start + page_num) lock.release() break else: queue.put(pre) lock.release() time.sleep(1) def get_user_diaries_count(): """获取日记本数""" return DiaryExtra.objects.using(settings.SLAVE_DB_NAME).filter( Q(diary__is_online=True) & Q(diary__content_level=3) & Q(reply_count__gt=0) & Q(diary__last_modified__gt=datetime(2017, 1, 1)) & Q(diary__is_sink=False) ).distinct().count() def get_user_diaries(start): """获取日记本id列表""" diaries_ids = DiaryExtra.objects.using(settings.SLAVE_DB_NAME).filter( Q(diary__is_online=True) & Q(diary__content_level=3) & Q(reply_count__gt=0) & Q(diary__last_modified__gt=datetime(2017, 1, 1)) & Q(diary__is_sink=False) ).values_list("diary_id", flat=True).distinct().order_by("-reply_count")[start: start + page_num] return list(diaries_ids) def get_diary_img_count(diary_ids): result = {} # 术前图片 img_count = PreOperationImage.objects.using(settings.SLAVE_DB_NAME).filter( diary_id__in=diary_ids ).values('diary_id').annotate(total=Count('diary_id')).iterator() for item in img_count: if item["diary_id"] not in result: result[item["diary_id"]] = {} result[item["diary_id"]]["before"] = item["total"] # 术后图片 topics = Problem.objects.using(settings.SLAVE_DB_NAME).filter( diary_id__in=diary_ids, is_online=True ).values('id', "diary_id").iterator() res = {} # {diary_id: [topic_id]} topic_ids = [] for topic in topics: topic_ids.append(topic["id"]) if topic["diary_id"] not in res: res[topic["diary_id"]] = [] res[topic["diary_id"]].append(topic["id"]) topic_imgs = list(TopicImage.objects.using(settings.SLAVE_DB_NAME). filter(topic_id__in=topic_ids).values('topic_id'). annotate(total=Count('topic_id'))) for diary_id, topics in res.items(): if diary_id not in result: result[diary_id] = {} if "after" not in result[diary_id]: result[diary_id]["after"] = 0 for img in topic_imgs: if img["topic_id"] in topics: result[diary_id]["after"] += img["total"] return result