#coding:utf8 from gm_types.mimas import SPAM_LABEL, SPAM_EVIL_FLAG, SPAM_SUGGEST, TRACTATE_PLATFORM, TRACTATE_STATUS, TRACTATE_CONTENT_LEVEL from talos.models.tractate import Tractate, TractateReply import json import math from multiprocessing import Pool, Manager from bs4 import BeautifulSoup from django.db.models import Max from django import db from django.conf import settings from django.core.management import BaseCommand from utils.rpc import get_rpc_invoker rpc_client = get_rpc_invoker() limit = 500 def antispam_check(text): if not text: return SPAM_LABEL.NORMAL, [] try: data = rpc_client['antispam/text/check'](text=text).unwrap() except: return SPAM_LABEL.NORMAL, [] evil = data.get('evil', 0) suggest = data.get('suggest', "Normal") keywords = data.get('data', {}).get('Keywords', []) if evil == SPAM_EVIL_FLAG.NORMAL: return SPAM_LABEL.NORMAL, [] if suggest == SPAM_SUGGEST.NORMAL: return SPAM_LABEL.NORMAL, [] elif suggest == SPAM_SUGGEST.REVIEW: return SPAM_LABEL.SPAM, keywords elif suggest == SPAM_SUGGEST.BLOCK: return SPAM_LABEL.SPAM, keywords return SPAM_LABEL.NORMAL, [] def update_tractate(queue): start_id = queue.get() print(start_id) tractates = Tractate.objects.using(settings.SLAVE_DB_NAME).filter( platform=TRACTATE_PLATFORM.GM, is_online=True, pk__gt=start_id, content_level__in=[TRACTATE_CONTENT_LEVEL.BAD, TRACTATE_CONTENT_LEVEL.GENERAL] )[: limit] max_id = tractates.aggregate(max_id=Max('id')) queue.put(max_id["max_id"]) if not tractates: return tractate_ids = [] for tractate in tractates: soup = BeautifulSoup(tractate.content) content_text = soup.get_text().replace("\n", '').replace(" ", '') lablel, keywords = antispam_check(content_text) if lablel == SPAM_LABEL.SPAM: tractate_ids.append(tractate.id) print({"type": "tractate", "look": True, "id": tractate.id, "keywords": keywords}) print(json.dumps({"id": tractate.id, "content": content_text, "keywords": keywords})) if tractate_ids: pass # Tractate.objects.filter(pk__in=tractate_ids).update(is_online=False) def update_tractate_reply(queue): start_id = queue.get() print(start_id) replies = TractateReply.objects.using(settings.SLAVE_DB_NAME).filter(is_fake=False, pk__gt=start_id, is_online=True)[:limit] max_id = replies.aggregate(max_id=Max('id')) queue.put(max_id["max_id"]) if not replies: return reply_ids = [] for reply in replies: lablel, keywords = antispam_check(reply.content) if lablel == SPAM_LABEL.SPAM: reply_ids.append(reply.id) print({"look": True, "id": reply.id, "content": reply.content, "keywords": keywords}) print(json.dumps({"id": reply.id, "content": reply.content, "keywords": keywords})) if reply_ids: pass # TractateReply.objects.filter(pk__in=reply_ids).update(is_online=False) class Command(BaseCommand): processes = 20 def start(self, count, processor): queue = Manager().Queue(maxsize=self.processes) queue.put(0) # 触发程序开始 args_list = [] cnt = int(math.ceil(count / limit)) for _ in range(cnt): args_list.append((queue,)) db.connections.close_all() pool = Pool(processes=self.processes) pool.starmap(processor, args_list) pool.close() pool.join() def start_tractate(self): count = Tractate.objects.using(settings.SLAVE_DB_NAME).filter( platform=TRACTATE_PLATFORM.GM, is_online=True, content_level__in=[TRACTATE_CONTENT_LEVEL.BAD, TRACTATE_CONTENT_LEVEL.GENERAL] ).count() self.start(count, update_tractate) def start_tractate_reply(self): count = TractateReply.objects.using(settings.SLAVE_DB_NAME).filter(is_fake=False, is_online=True).count() self.start(count, update_tractate_reply) def handle(self, *args, **options): self.start_tractate() # self.start_tractate_reply()