diary_content_check.py 4.42 KB
#coding:utf8
import json
import math
from multiprocessing import Pool, Manager

from bs4 import BeautifulSoup
from django.db.models import Max
from django import db
from django.conf import settings
from django.core.management import BaseCommand

from gm_types.gaia import DIARY_AUDIT_STATUS, DIARY_CONTENT_LEVEL
from gm_types.mimas import SPAM_LABEL, SPAM_EVIL_FLAG, SPAM_SUGGEST, GRABBING_PLATFORM, CONTENT_CLASS

from talos.models.diary import Diary
from talos.models.topic import Problem
from utils.rpc import get_rpc_invoker
from utils.common import gm_decode_html


rpc_client = get_rpc_invoker()
limit = 500


def antispam_check(text):

    if not text:
        return SPAM_LABEL.NORMAL, []

    try:
        data = rpc_client['antispam/text/check'](text=text).unwrap()
    except:
        return SPAM_LABEL.NORMAL, []

    evil = data.get('evil', 0)
    suggest = data.get('suggest', "Normal")
    keywords = data.get('data', {}).get('Keywords', [])

    if evil == SPAM_EVIL_FLAG.NORMAL:
        return SPAM_LABEL.NORMAL, []

    if suggest == SPAM_SUGGEST.NORMAL:
        return SPAM_LABEL.NORMAL, []
    elif suggest == SPAM_SUGGEST.REVIEW:
        return SPAM_LABEL.SPAM, keywords
    elif suggest == SPAM_SUGGEST.BLOCK:
        return SPAM_LABEL.SPAM, keywords

    return SPAM_LABEL.NORMAL, []


def update_diary(queue):

    start_id = queue.get()
    print(start_id)
    diaries = Diary.objects.using(settings.SLAVE_DB_NAME).filter(
        is_online=True,
        content_level__in=[DIARY_CONTENT_LEVEL.UNAUDITED, DIARY_CONTENT_LEVEL.ILLEGAL, DIARY_CONTENT_LEVEL.BAD]
    )[:limit]
    max_id = diaries.aggregate(max_id=Max('id'))
    queue.put(max_id["max_id"])
    if not diaries:
        return

    diary_ids = []
    for diary in diaries:

        lablel, keywords = antispam_check(diary.title)
        if lablel == SPAM_LABEL.SPAM:
            diary_ids.append(diary.id)
            print(json.dumps({"type": "diary", "id": diary.id, "content": diary.title, "keywords": keywords}))
            print({"type": "diary", "look": True, "id": diary.id, "content": diary.title, "keywords": keywords})

    if diary_ids:
        pass
        # Diary.objects.filter(pk__in=diary_ids).update(is_online=False)


def update_topic(queue):

    start_id = queue.get()
    print(start_id)
    topics = Problem.objects.using(settings.SLAVE_DB_NAME).filter(
        diary_id__isnull=False, pk__gt=start_id, is_online=True,
        diary__content_level__in=[DIARY_CONTENT_LEVEL.UNAUDITED, DIARY_CONTENT_LEVEL.ILLEGAL, DIARY_CONTENT_LEVEL.BAD]
    )[: limit]

    max_id = topics.aggregate(max_id=Max('id'))
    queue.put(max_id["max_id"])
    if not topics:
        return

    topic_ids = []
    for topic in topics:

        soup = BeautifulSoup(topic.content)
        content_text = soup.get_text().replace("\n", '').replace(" ", '')
        lablel, keywords = antispam_check(content_text)
        if lablel == SPAM_LABEL.SPAM:
            topic_ids.append(topic.id)
            print(json.dumps({"type": "topic", "id": topic.id, "content": topic.answer, "keywords": keywords}))
            print({"type": "topic", "look": True, "id": topic.id, "keywords": keywords})

    if topic_ids:
        pass
        # Answer.objects.filter(pk__in=answer_ids).update(is_online=False)


class Command(BaseCommand):

    processes = 20

    def start(self, count, processor):

        print(count)

        queue = Manager().Queue(maxsize=self.processes)
        queue.put(0)  # 触发程序开始

        args_list = []
        cnt = int(math.ceil(count / limit))
        for _ in range(cnt):
            args_list.append((queue,))

        db.connections.close_all()
        pool = Pool(processes=self.processes)
        pool.starmap(processor, args_list)
        pool.close()
        pool.join()

    def start_diary(self):

        count = Diary.objects.using(settings.SLAVE_DB_NAME).filter(
            is_online=True,
            content_level__in=[DIARY_CONTENT_LEVEL.UNAUDITED, DIARY_CONTENT_LEVEL.ILLEGAL, DIARY_CONTENT_LEVEL.BAD]
        ).count()
        self.start(count, update_diary)

    def start_topic(self):

        count = Problem.objects.using(settings.SLAVE_DB_NAME).filter(
            diary_id__isnull=False, is_online=True,
            diary__content_level__in=[DIARY_CONTENT_LEVEL.UNAUDITED, DIARY_CONTENT_LEVEL.ILLEGAL, DIARY_CONTENT_LEVEL.BAD]
        ).count()
        self.start(count, update_topic)

    def handle(self, *args, **options):

        self.start_diary()

        self.start_topic()