# coding:utf-8
import re
import os
import os.path
import json
import traceback
from django.conf import settings
from django.core.management import BaseCommand, CommandError
from gm_types.mimas import GRABBING_PLATFORM, QUESTION_TYPE, QA_CONTENT_TYPE, \
MEDIA_IMAGE_URL_SOURCE, VIDEO_SOURCE_TYPE, IMAGE_TYPE
from qa.models import Question, Answer, AnswerImage
from qa.libs import get_media_extra_info
from hera.queries.qa import get_media_list_from_content
from qa.serializers import QuestionSerializer, AnswerSerializer, QuestionImage
from live.tasks import get_qiniu_persistent_ids
from utils.common import get_data_from_rich_text
from utils.execel import ExcelWriter
from utils.rpc import logging_exception
from qa.utils.image import handle_image_type
from qa.tasks.view import gif_create_webp
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('-p', '--path', dest='path', type=str)
def find_data_file(self, path):
result = {
"question": [],
"answer": [],
}
for root, _, files in os.walk(path):
for i in files:
if i.startswith("zhihu_spider_data_for_query_word_"):
result["answer"].append(os.path.join(root, i))
elif i.startswith("zhihu_spider_question_data_for_query_word_"):
result["question"].append(os.path.join(root, i))
return result
def read_question_from_file(self, question_file):
"""读取数据。"""
with open(question_file) as f:
lines = f.readlines()
return lines
def read_answer_from_file(self, answer_file):
"""读取数据。"""
with open(answer_file) as f:
lines = f.readlines()
return lines
def get_question_by_platform_id(self, platform_id):
if not platform_id:
return None
try:
return Question.objects.using(settings.SLAVE_DB_NAME).get(platform_id=platform_id)
except:
return None
def get_answer_by_platform_id(self, platform_id):
if not platform_id:
return None
try:
return Answer.objects.using(settings.SLAVE_DB_NAME).get(platform_id=platform_id)
except:
return None
def save_question(self, validated_data):
images_data = validated_data.pop('images')
tags_data = validated_data.pop('tags')
question = Question.objects.create(**validated_data)
image_urls = [image_data['image_url'] for image_data in images_data if image_data['image_url']]
gevent_dic = handle_image_type(image_urls)
gif_create_webp.delay(conent_id=question.id, content_type="question")
for image_data in images_data:
if image_data['image_url']:
QuestionImage.objects.create(
question=question,
image_url=image_data['image_url'],
width=image_data.get("width", 0),
height=image_data.get("height", 0),
image_url_source=image_data.get("image_url_source", MEDIA_IMAGE_URL_SOURCE.CREATE),
image_type=gevent_dic.get(image_data['image_url'], IMAGE_TYPE.OTHER) or IMAGE_TYPE.OTHER,
)
for tag in tags_data:
try:
QuestionTag(question=question, tag=tag['tag']).save()
except serializers.ValidationError:
pass
return question
def strip_content(self, content):
content = content.replace("", '')
content = content.replace("谢邀。", '')
content = content.replace("谢邀,", '')
content = content.replace("谢邀", '')
content = content.replace("泻药。", '')
content = content.replace("泻药,", '')
content = content.replace("泻药", '')
for match in re.finditer(r'
', content):
if match.group():
content = content.replace(match.group(), '')
return content
def create_question(self, data):
if self.get_question_by_platform_id(data["platform_id"]):
print("问题已经存在,platform_id=", data["platform_id"])
return
_data = {
"user": data["user_id"],
"title": data["title"],
"content": self.strip_content(data["content"]),
"is_online": False,
"images": [],
"tags": [],
"question_type": QA_CONTENT_TYPE.ORDINARY,
"platform": GRABBING_PLATFORM.ZHIHU,
"platform_id": data["platform_id"],
}
if data["tags"]:
_data["platform_tag"] = ";".join(data["tags"])
_, _image_list = get_data_from_rich_text(data["content"], "//img/@src")
_image_list = list(filter(lambda i: i.startswith("http"), _image_list))
_images = get_media_extra_info(
list(map(lambda item: item.split("-")[0], _image_list)),
)
for image in _images:
image["image_url_source"] = MEDIA_IMAGE_URL_SOURCE.RICH_TEXT
_data["images"] = _images
question = self.save_question(_data)
return question.id
def save_answer(self, validated_data):
images_data = validated_data.pop('images')
answer = Answer.objects.create(**validated_data)
image_urls = [image_data['image_url'] for image_data in images_data if image_data['image_url']]
gevent_dic = handle_image_type(image_urls)
gif_create_webp.delay(conent_id=answer.id, content_type='answer')
for image_data in images_data:
if image_data['image_url']:
t = AnswerImage(
answer=answer,
image_url=image_data['image_url'],
width=image_data.get("width", 0),
height=image_data.get("height", 0),
image_url_source=image_data.get("image_url_source", MEDIA_IMAGE_URL_SOURCE.CREATE),
image_type=gevent_dic.get(image_data['image_url'], IMAGE_TYPE.OTHER) or IMAGE_TYPE.OTHER,
)
t.save()
return answer
def create_answer(self, data):
question = self.get_question_by_platform_id(data.get("question_id"))
if not question:
print("创建回答时候,问题不存在,question_id=", data["question_id"])
return None
if self.get_answer_by_platform_id(data["platform_id"]):
print("回答已经存在,platform_id=", data["platform_id"])
return
question_id = question.id
content = self.strip_content(data.get("content", ""))
user_id = data["user_id"]
_data = {
"question_id": question_id,
"user": user_id,
"content": content,
"is_online": False,
"platform_id": data["platform_id"],
"content_type": QA_CONTENT_TYPE.ORDINARY,
}
if data["tags"]:
_data["platform_tag"] = ";".join(data["tags"])
_, _image_list = get_data_from_rich_text(content, "//img/@src")
_image_list = list(filter(lambda i: i.startswith("http"), _image_list))
_images = get_media_extra_info(
list(map(lambda item: item.split("-")[0], _image_list)),
)
for item in _images:
item.update({
"image_url_source": MEDIA_IMAGE_URL_SOURCE.RICH_TEXT,
})
_data["images"] = _images
answer = self.save_answer(validated_data=_data)
return answer.id
def handle(self, *args, **options):
path = options['path']
if not path:
raise CommandError("请指定数据文件目录")
data_file = self.find_data_file(path)
for f_name in data_file["question"]:
questions = self.read_question_from_file(f_name)
for item in questions:
question = json.loads(item)
err = None
try:
question_id = self.create_question(question)
if not question_id:
err = True
except Exception as e:
print(e)
err = True
if err:
del question["content"]
print("创建问题失败: ", question)
for f_name in data_file["answer"]:
answers = self.read_answer_from_file(f_name)
for item in answers:
answer = json.loads(item)
err = None
try:
answer_id = self.create_answer(answer)
if not answer_id:
err = True
except Exception as e:
print(traceback.print_exc())
err = True
if err:
del answer["content"]
print("创建问题失败: ", answer)