Commit 40e2dfa4 authored by 钟尚武's avatar 钟尚武

Merge branch 'feature/weibo' into 'test'

微博数据入库

See merge request !100
parents 6503941f 9343b777
# 读取json文件 调用api 将数据导入库
import os
from datetime import datetime
import json
import requests
from random import randint
from django.core.management import BaseCommand
from engine.rpc import rpc_invoker
from api.utils.upload import upload_image
from api.cache.cache import ins_cache
from libs.user import get_user_by_ids
from alpha_types.venus import ERROR
from alpha_types.venus import GRAP_PLATFORM
from engine.logger import info_logger, error_logger, logging_exception
IMAGE_SUFFIX = '-w'
FILE_PATH = '/Users/haowei/Desktop/xhs/'
class Command(BaseCommand):
"""
评论爬取
"""
user_id_start = 241757306 # end 241806255
del_cache_keys = []
create_faild_topic_list = []
create_faild_pictorial_list = []
top_comment_error_create = []
second_topic_comments = []
second_topic_error_comments = []
second_pictorial_comments = []
second_pictorial_error_comments = []
second_reply_error_create = []
def del_cache(self):
for obj in self.del_cache_keys:
ins_cache.delete(obj)
def get_random_user_id(self):
# 随机获取用户ID
while True:
index = randint(0, 5000)
user_id = self.user_id_start + index
data = rpc_invoker['venus/community/user/is_shadow'](user_id=user_id).unwrap()
if not data:
continue
ret = data.get('user_id')
if ret:
return user_id
def get_user_id(self, id_, platform):
# 获取用户ID 缓存记录保留用户关系
cache_key = 'grap:{}:{}'.format(platform, id_)
exist_key = 'grap:{}:{}'
value = ins_cache.get(cache_key)
user_id = None
if not value:
while True:
user_id = self.get_random_user_id()
exist = exist_key.format(platform, user_id)
if not ins_cache.get(exist):
ins_cache.set(exist, id_)
self.del_cache_keys.append(exist)
break
ins_cache.set(cache_key, user_id)
self.del_cache_keys.append(exist)
else:
user_id = int(value)
return user_id
def get_json_data_from_dir(self, is_topic=None, is_pictorial=None):
# 获取目录文件数据
ret = []
if is_topic:
file_path = FILE_PATH + 'topic/'
if is_pictorial:
file_path = FILE_PATH + 'pictorial/'
filenames = []
for _, _, names in os.walk(file_path):
filenames = names
for filename in filenames:
ret.append(self.get_file_json_data(file_path + filename))
return ret
def get_file_json_data(self, file):
# 获取文件数据
data = None
with open(file, 'r') as f:
content = f.read()
if content.startswith(u'\ufeff'):
content = content.encode('utf8')[3:].decode('utf8')
data = json.loads(content)
return data
def get_image_size(self, image_url):
# 获取图片宽高
try:
url = image_url + IMAGE_SUFFIX + '?imageInfo'
response = requests.request("GET", url)
info = response.json()
return info.get('width'), info.get('height')
except Exception as e:
logging_exception()
return None, None
def image_info(self, urls):
# 获取图片信息
ret = []
for url in urls:
image_url = upload_image(url)
while not image_url:
image_url = upload_image(url)
width, height = self.get_image_size(image_url)
while not width and not height:
width, height = self.get_image_size(image_url)
ret.append(
{
'url': image_url.replace('http://alpha.gmeiapp.com/', ''),
'height': height,
'width': width,
}
)
return ret
def revise_comments(self, comment):
ret = []
comment['content'] = comment.get('comment')
reply = comment.pop('reply', None)
if not reply:
return comment, ret
for info in reply:
info['reply_id'] = comment.get('id')
info['type'] = comment.get('type')
ret.append(info)
return comment, ret
def create_comment(self, comment, platform):
ret = rpc_invoker['venus/community/crawl/replys'](data=[comment], platform=platform, topic_id=comment.get('topic_id'), pictorial_id=comment.get('pictorial_id')).unwrap()
if not ret:
self.second_reply_error_create.append(comment)
def create_topic(self, topics, platform):
count = 0
for topic in topics[:100]:
count += 1
topic_comments = topic.pop('comments', None)
images = topic.pop('image')
topic['images'] = self.image_info(images)
topic['user_id'] = self.get_user_id(id_=topic.get('id'), platform=platform)
topic.pop('user')
print('-------- topic current count: ', count)
topic_obj = rpc_invoker['venus/community/crawl/topic'](data=topic, platform=platform, pictorial_id=None).unwrap()
if not topic_obj:
self.create_faild_topic_list.append(topic.get('id'))
continue
print('-------- return topic info: ', topic_obj)
if not topic_comments:
continue
if platform == GRAP_PLATFORM.XIAOHONGSHU:
for topic_comment in topic_comments:
topic_comment['topic_id'] = topic_obj.get('id')
top_comment, comments = self.revise_comments(topic_comment)
top_comment['user_id'] = self.get_user_id(id_=top_comment.get('user').get('id'), platform=platform)
top_comment.pop('user')
ret = rpc_invoker['venus/community/crawl/replys'](data=[top_comment], platform=platform, topic_id=topic_obj.get('id'), pictorial_id=None).unwrap()
if not ret:
self.top_comment_error_create.append(top_comment)
continue
if not comments:
continue
top_id = ret.get('reply_ids')[0]
for obj in comments:
obj['user_id'] = self.get_user_id(id_=obj.get('user').get('id'), platform=platform)
obj['top_id'] = top_id
obj.pop('user')
# rpc_invoker['venus/community/crawl/replys'](data=comments, platform=platform, topic_id=topic_id, pictorial_id=pictorial_id, top_id=top_id).unwrap()
self.second_topic_comments.extend(comments)
# print('-------- topic_comments:', self.second_topic_comments)
# print('-------- top_comment_error_create:', self.top_comment_error_create)
self.del_cache()
return None, None
def create_pictorial(self, pictorial, platform):
topics = []
if not pictorial:
return None, None
pictorial_comments = pictorial.pop('comments', None)
images = self.image_info(pictorial.pop('image'))
index = 0
for obj in images:
index += 1
topics.append({
'id': pictorial.get('id') + str(index),
'content': pictorial.get('content'),
'images': [obj],
'create_time': pictorial.get('create_time'),
'user_id': self.get_user_id(id_=obj.get('url'), platform=platform)
})
pictorial['user_id'] = self.get_user_id(id_=pictorial.get('id'), platform=platform)
pictorial['description'] = pictorial.get('content')
# 榜单名称取爬取内容的前20字符
index_end = 20
if len(pictorial.get('content')) < index_end:
index_end = len(pictorial.get('content')) - 1
pictorial['name'] = pictorial.get('content')[:index_end]
pictorial_obj = rpc_invoker['venus/community/crawl/pictorial'](data=pictorial, platform=platform).unwrap()
if not pictorial_obj:
self.create_faild_pictorial_list.append(pictorial)
pictorial_id = pictorial_obj.get('id')
if topics:
for obj in topics:
rpc_invoker['venus/community/crawl/topic'](data=obj, platform=platform, pictorial_id=pictorial_id).unwrap()
if pictorial_comments:
if platform == GRAP_PLATFORM.XIAOHONGSHU:
for pictorial_comment in pictorial_comments:
pictorial_comment['pictorial_id'] = pictorial_id
top_comment, comments = self.revise_comments(pictorial_comment)
top_comment['user_id'] = self.get_user_id(id_=top_comment.get('user').get('id'), platform=platform)
top_comment.pop('user')
ret = rpc_invoker['venus/community/crawl/replys'](data=[top_comment], platform=platform, pictorial_id=pictorial_id).unwrap()
if not ret:
self.top_comment_error_create.append(top_comment)
continue
if not comments:
continue
top_id = ret.get('reply_ids')[0]
for obj in comments:
obj['user_id'] = self.get_user_id(id_=obj.get('user').get('id'), platform=platform)
obj['top_id'] = top_id
obj.pop('user')
# rpc_invoker['venus/community/crawl/replys'](data=comments, platform=platform, topic_id=topic_id, pictorial_id=pictorial_id, top_id=top_id).unwrap()
self.second_pictorial_comments.extend(comments)
return None, None
def handle(self, *args, **options):
platform = 4
# 帖子
print('----- start deal topic at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')))
topic_data = self.get_json_data_from_dir(is_topic=1)
self.create_topic(topics=topic_data, platform=platform)
print('-------- create_faild_topic_list:', len(self.create_faild_topic_list))
print('-------- second_topic_comments:', len(self.second_topic_comments))
print('----- end deal topic at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')))
# 榜单
print('----- start deal pictorial at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')))
pictorial_data = self.get_json_data_from_dir(is_pictorial=1)
count = 0
for pictorial in pictorial_data[:50]:
count += 1
print('------- current pictorial count :', count)
self.create_pictorial(pictorial=pictorial, platform=platform)
print('-------- create_faild_pictorial_list:', len(self.create_faild_pictorial_list))
print('-------- top_comment_error_create:', len(self.top_comment_error_create))
print('----- end deal pictorial at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')))
# 二级评论
print('----- start deal second topic reply at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')))
count = 0
for topic_comment in self.second_topic_comments[:10]:
count += 1
print('------- current second topic reply count :', count)
self.create_comment(comment=topic_comment, platform=platform)
print('-------- second_reply_error_create:', len(self.second_reply_error_create))
print('----- end deal second topic reply at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')))
if len(self.top_comment_error_create) > 0:
print('-------- second_topic_comments:', len(self.second_topic_comments))
if len(self.create_faild_topic_list) > 0:
print('-------- create_faild_topic_list:', len(self.create_faild_topic_list))
if len(self.create_faild_pictorial_list) > 0:
print('-------- create_faild_pictorial_list:', len(self.create_faild_pictorial_list))
self.del_cache()
""" 微博帖子入库榜单脚本 """
import time
import os
import re
from datetime import datetime
import json
import requests
from random import randint
from django.core.management import BaseCommand
from engine.rpc import rpc_invoker
from api.utils.upload import upload_weibo_image
from api.cache.cache import ins_cache
from engine.logger import info_logger, error_logger, logging_exception
IMAGE_SUFFIX = '-w'
FILE_PATH = '/Users/zhongshangwu/workspace/gengmei/like/saturn/api/management/commands/'
# TODO
# 1. 图片裁剪上传
# 2. 过滤二级带图评论
# 3. 其他的过滤规则
#
class Command(BaseCommand):
user_id_start = 241757306 # end 241806255
del_cache_keys = []
create_faild_topic_list = []
create_faild_pictorial_list = []
top_pictorial_error_comments = []
second_pictorial_error_comments = []
topic_error_comments = []
stats = {}
def get_random_user_id(self):
# 随机获取马甲用户ID
# return 241759142
while True:
index = randint(0, 5000)
user_id = self.user_id_start + index
data = rpc_invoker['venus/community/user/is_shadow'](user_id=user_id).unwrap()
if not data:
continue
ret = data.get('user_id')
if ret:
return user_id
def get_user_id(self, weibo_id, weibo_user_id, platform):
""" 同一条博文下面的 “关系” 需要平移过来
通过 Redis Hash 维持用户关系: key(weibo_user_id) --> value(internal_user_id)
"""
weibo_cache_key = "grap:{0}:{1}".format(platform, weibo_id)
user_id = ins_cache.hget(weibo_cache_key, weibo_user_id)
if not user_id:
user_id = self.get_random_user_id()
ins_cache.hset(weibo_cache_key, weibo_user_id, user_id)
else:
user_id = int(user_id)
return user_id
def get_weibo_id(self, content):
return content.get('user', {}).get('id')
def get_json_data_from_dir(self, is_topic=None, is_pictorial=None):
# 获取目录文件数据
ret = []
if is_topic:
file_path = FILE_PATH + 'topic/'
if is_pictorial:
file_path = FILE_PATH + 'pictorial/'
filenames = []
for _, _, names in os.walk(file_path):
filenames = names
for filename in filenames:
ret.append(self.get_file_json_data(file_path + filename))
return ret
def get_file_json_data(self, file):
# 获取文件数据
data = None
with open(file, 'r') as f:
content = f.read()
if content.startswith(u'\ufeff'):
content = content.encode('utf8')[3:].decode('utf8')
data = json.loads(content)
return data
def get_image_size(self, image_url):
# 获取图片宽高
try:
url = image_url + IMAGE_SUFFIX + '?imageInfo'
response = requests.request("GET", url)
info = response.json()
return info.get('width'), info.get('height')
except Exception as e:
logging_exception()
return None, None
def image_info(self, urls):
# 获取图片信息
ret = []
for url in urls:
image_url = upload_weibo_image(url)
while not image_url:
image_url = upload_weibo_image(url)
width, height = self.get_image_size(image_url)
while not width and not height:
width, height = self.get_image_size(image_url)
print(image_url)
ret.append(
{
'url': image_url.replace('http://alpha.gmeiapp.com/', ''),
'height': height,
'width': width,
}
)
return ret
def revise_comments(self, weibo_id, comment, platform):
""" 拆分 一级和二级评论 """
replies = []
if self.filter_first_comment(comment["content"], comment["images"], comment["reply"]):
return None, []
reply = comment.pop('reply', None)
comment["user_id"] = self.get_user_id(weibo_id, self.get_weibo_id(comment), platform)
comment.pop("user", None)
if not reply:
return comment, replies
images = []
if not comment["images"]:
normal_images = set(reply[0]["images"])
for info in reply[1:]:
info_images = set(info["images"])
normal_images = normal_images & info_images
comment["images"] = list(normal_images)
for info in reply:
if self.filter_second_comment(info["content"], comment["images"], info["images"]):
continue
info['reply_id'] = comment.get('id') # 通过微博平台的被评论ID, 在我们平台创建时找到对应的被评论ID
info['user_id'] = self.get_user_id(weibo_id, self.get_weibo_id(info), platform)
info.pop("user", None)
info.pop("images", None) # 二级评论不需要带图
replies.append(info)
return comment, replies
def create_pictorial(self, pictorial, platform):
""" 创建榜单内容 """
pictorial["content"] = pictorial["content"].replace("#", " ")
if self.filter_weibo(pictorial["content"], pictorial["images"]):
return None
weibo_id = pictorial.get('id')
pictorial['user_id'] = self.get_user_id(
weibo_id=weibo_id,
weibo_user_id=self.get_weibo_id(pictorial),
platform=platform
)
# print("Pictorial user id:", pictorial['user_id'])
# 榜单名称取爬取内容的前20字符
index_end = 20
if len(pictorial.get('content')) < index_end:
index_end = len(pictorial.get('content')) - 1
pictorial['name'] = pictorial.get('content')[:index_end]
pictorial['description'] = pictorial.get('content')
weibo_comments = pictorial.pop('comments', None) # --> 微博评论
topics = [] # 一级带图评论 转化为内部的帖子
topic_count = 0
pictorial_comments = [] # 一级无图评论 转化为榜单的评论
first_pictorial_commennts = 0
# RPC 调用创建榜单
pictorial_obj = rpc_invoker['venus/community/crawl/pictorial'](data=pictorial, platform=platform).unwrap()
if not pictorial_obj:
self.create_faild_pictorial_list.append(pictorial)
return None
pictorial_id = pictorial_obj.get('id')
self.stats[weibo_id] = {
"topics": {},
"first_comments": {}
}
# 处理微博博文评论
for idx, weibo_comment in enumerate(self.filter_duplicate_comments(weibo_comments)):
user_id = self.get_user_id(
weibo_id=weibo_id,
weibo_user_id=self.get_weibo_id(weibo_comment),
platform=platform
)
comment, replies = self.revise_comments(weibo_id, weibo_comment, platform)
if not comment:
continue
if comment["images"]: # -> to topic
images = self.image_info(comment.pop('images'))
topic = {
# 'id': pictorial_id + str(idx),
'id': comment['id'],
'content': comment.get('content', ''),
'images': images,
'create_time': comment.get('create_time'),
'user_id': user_id,
}
topic_obj = rpc_invoker['venus/community/crawl/topic'](data=topic, platform=platform, pictorial_id=pictorial_id).unwrap()
if not topic_obj:
self.create_faild_topic_list.append(topic.get('id'))
else:
# 创建帖子评论
# for topic_coment in replies:
# topic_coment["topic_id"] = topic_obj.get("id")
self.stats[weibo_id]["topics"][comment['id']] = {
"reply": []
}
replies = replies[:50]
ret = rpc_invoker['venus/community/crawl/replys'](data=replies, platform=platform, topic_id=topic_obj.get('id'), pictorial_id=None).unwrap()
if not ret:
self.topic_error_comments.extend(replies)
else:
self.stats[weibo_id]["topics"][comment['id']] = {
"reply": [item["id"] for item in replies]
}
else: # -> to pictorial comment
top_comments_obj = rpc_invoker['venus/community/crawl/replys'](data=[comment], platform=platform, pictorial_id=pictorial_id).unwrap()
if not top_comments_obj.get("reply_ids"):
self.top_pictorial_error_comments.append(comment)
else:
# 创建榜单二级评论
self.stats[weibo_id]["first_comments"][comment['id']] = {
"reply": []
}
replies = replies[:50]
for reply in replies:
reply["top_id"] = top_comments_obj.get("reply_ids")[0]
ret = rpc_invoker['venus/community/crawl/replys'](data=replies, platform=platform, topic_id=None, pictorial_id=pictorial_id).unwrap()
if not ret:
self.second_pictorial_error_comments.extend(replies)
else:
self.stats[weibo_id]["first_comments"][comment['id']] = {
"reply": [item["id"] for item in replies]
}
def filter_duplicate_comments(self, comments):
""" 过滤重复的评论 """
exists_comment = set()
ret = []
for comment in comments[::-1]:
if comment["id"] not in exists_comment:
exists_comment.add(comment["id"])
ret.append(comment)
return ret[::-1]
def filter_weibo(self, content, images):
""" 过滤 """
return (
self.filter_normal(content)
or self.filter_gif_images(images)
)
def filter_first_comment(self, content, images, replies):
""" 过滤一级评论 """
return (
self.filter_normal(content)
or self.filter_gif_images(images)
or not replies
)
def filter_second_comment(self, content, first_images, images):
""" 过滤二级评论 """
return (
self.filter_normal(content)
or self.filter_second_images(first_images, images)
)
def filter_gif_images(self, images):
for image in images:
if ".gif" in image:
return True
return False
def filter_second_images(self, first_images, images):
""" 过滤二级评论的图片 """
first_images = set(first_images or [])
images = set(images or [])
return images - first_images
def filter_normal(self, content):
if "@" in content:
return True
if "榜姐" in content or "渣浪" in content or "微博" in content:
return True
if "收起全文" in content:
return True
if "http://" in content or "https://" in content:
return True
regex = re.compile("#[^#]+#")
return regex.search(content)
def handle(self, *args, **options):
# print(upload_weibo_image("http://wx2.sinaimg.cn/woriginal/bbeff396ly1fxqi1wl2hqj20go0bvabl.jpg"))
platform = 5 # 微博
# 榜单
print('----- start deal weibo blog at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')))
pictorial_data = self.get_json_data_from_dir(is_pictorial=1)
for idx, pictorial in enumerate(pictorial_data):
print('------- current pictorial idx :', idx)
start = time.time()
ret = self.create_pictorial(pictorial=pictorial, platform=platform)
print("escape:", time.time() - start)
if not ret:
print("create failed weibo blog:", pictorial["id"])
import pprint
pprint.pprint(self.stats)
print('-------- create_faild_topic_list:', len(self.create_faild_topic_list))
print('-------- topic_error_comments:', len(self.topic_error_comments))
print('-------- create_faild_pictorial_list:', len(self.create_faild_pictorial_list))
print('-------- top_pictorial_error_comments:', len(self.top_pictorial_error_comments))
print('-------- second_pictorial_error_comments:', len(self.second_pictorial_error_comments))
\ No newline at end of file
import requests
from gm_upload import upload, upload_file
from gm_upload import IMG_TYPE
import io
from PIL import Image
# import numpy as np
def upload_image(url, img_type=IMG_TYPE.TOPIC):
......@@ -10,3 +13,24 @@ def upload_image(url, img_type=IMG_TYPE.TOPIC):
return upload(response.content, img_type=img_type)
except:
return None
def upload_weibo_image(url, img_type=IMG_TYPE.TOPIC):
'''非站内图片处理'''
try:
response = requests.get(url)
img = Image.open(io.BytesIO(response.content))
w, h = img.size
img = img.crop((0, 0, w, h-10))
# img = img.convert('RGB')
# content = np.array(img)[..., ::-1]
temp = io.BytesIO()
# content = Image.fromarray(content)
img.save(temp, format="png")
content = temp.getvalue()
return upload(content, img_type=img_type)
except:
return None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment