#coding:utf8 import json import math from multiprocessing import Pool, Manager from bs4 import BeautifulSoup from django.db.models import Max from django import db from django.conf import settings from django.core.management import BaseCommand from gm_types.gaia import DIARY_AUDIT_STATUS, DIARY_CONTENT_LEVEL from gm_types.mimas import SPAM_LABEL, SPAM_EVIL_FLAG, SPAM_SUGGEST, GRABBING_PLATFORM, CONTENT_CLASS from talos.models.diary import Diary from talos.models.topic import Problem from utils.rpc import get_rpc_invoker from utils.common import gm_decode_html rpc_client = get_rpc_invoker() limit = 500 def antispam_check(text): if not text: return SPAM_LABEL.NORMAL, [] try: data = rpc_client['antispam/text/check'](text=text).unwrap() except: return SPAM_LABEL.NORMAL, [] evil = data.get('evil', 0) suggest = data.get('suggest', "Normal") keywords = data.get('data', {}).get('Keywords', []) if evil == SPAM_EVIL_FLAG.NORMAL: return SPAM_LABEL.NORMAL, [] if suggest == SPAM_SUGGEST.NORMAL: return SPAM_LABEL.NORMAL, [] elif suggest == SPAM_SUGGEST.REVIEW: return SPAM_LABEL.SPAM, keywords elif suggest == SPAM_SUGGEST.BLOCK: return SPAM_LABEL.SPAM, keywords return SPAM_LABEL.NORMAL, [] def update_diary(queue): start_id = queue.get() print(start_id) diaries = Diary.objects.using(settings.SLAVE_DB_NAME).filter( is_online=True, content_level__in=[DIARY_CONTENT_LEVEL.UNAUDITED, DIARY_CONTENT_LEVEL.ILLEGAL, DIARY_CONTENT_LEVEL.BAD] )[:limit] max_id = diaries.aggregate(max_id=Max('id')) queue.put(max_id["max_id"]) if not diaries: return diary_ids = [] for diary in diaries: lablel, keywords = antispam_check(diary.title) if lablel == SPAM_LABEL.SPAM: diary_ids.append(diary.id) print(json.dumps({"type": "diary", "id": diary.id, "content": diary.title, "keywords": keywords})) print({"type": "diary", "look": True, "id": diary.id, "content": diary.title, "keywords": keywords}) if diary_ids: pass # Diary.objects.filter(pk__in=diary_ids).update(is_online=False) def update_topic(queue): start_id = queue.get() print(start_id) topics = Problem.objects.using(settings.SLAVE_DB_NAME).filter( diary_id__isnull=False, pk__gt=start_id, is_online=True, diary__content_level__in=[DIARY_CONTENT_LEVEL.UNAUDITED, DIARY_CONTENT_LEVEL.ILLEGAL, DIARY_CONTENT_LEVEL.BAD] )[: limit] max_id = topics.aggregate(max_id=Max('id')) queue.put(max_id["max_id"]) if not topics: return topic_ids = [] for topic in topics: soup = BeautifulSoup(topic.content) content_text = soup.get_text().replace("\n", '').replace(" ", '') lablel, keywords = antispam_check(content_text) if lablel == SPAM_LABEL.SPAM: topic_ids.append(topic.id) print(json.dumps({"type": "topic", "id": topic.id, "content": topic.answer, "keywords": keywords})) print({"type": "topic", "look": True, "id": topic.id, "keywords": keywords}) if topic_ids: pass # Answer.objects.filter(pk__in=answer_ids).update(is_online=False) class Command(BaseCommand): processes = 20 def start(self, count, processor): print(count) queue = Manager().Queue(maxsize=self.processes) queue.put(0) # 触发程序开始 args_list = [] cnt = int(math.ceil(count / limit)) for _ in range(cnt): args_list.append((queue,)) db.connections.close_all() pool = Pool(processes=self.processes) pool.starmap(processor, args_list) pool.close() pool.join() def start_diary(self): count = Diary.objects.using(settings.SLAVE_DB_NAME).filter( is_online=True, content_level__in=[DIARY_CONTENT_LEVEL.UNAUDITED, DIARY_CONTENT_LEVEL.ILLEGAL, DIARY_CONTENT_LEVEL.BAD] ).count() self.start(count, update_diary) def start_topic(self): count = Problem.objects.using(settings.SLAVE_DB_NAME).filter( diary_id__isnull=False, is_online=True, diary__content_level__in=[DIARY_CONTENT_LEVEL.UNAUDITED, DIARY_CONTENT_LEVEL.ILLEGAL, DIARY_CONTENT_LEVEL.BAD] ).count() self.start(count, update_topic) def handle(self, *args, **options): self.start_diary() self.start_topic()