# 读取json文件 调用api 将数据导入库 import os from datetime import datetime import json import requests import random from random import randint from django.core.management import BaseCommand from engine.rpc import rpc_invoker from api.utils.upload import upload_image from api.cache.cache import ins_cache from libs.user import get_user_by_ids from alpha_types.venus import ERROR from alpha_types.venus import GRAP_PLATFORM from engine.logger import info_logger, error_logger, logging_exception IMAGE_SUFFIX = '-w' FILE_PATH = '/srv/apps/saturn/xiaohongshu/' class Command(BaseCommand): """ 评论爬取 """ user_id_start = 241757306 # end 241806255 del_cache_keys = [] create_faild_topic_list = [] create_faild_pictorial_list = [] top_comment_error_create = [] second_topic_comments = [] second_topic_error_comments = [] second_pictorial_comments = [] second_pictorial_error_comments = [] second_reply_error_create = [] shadow_user_ids = [] def del_cache(self): for obj in self.del_cache_keys: ins_cache.delete(obj) def load_shadow_users(self): data = rpc_invoker['venus/community/crawl/load_shawdow_user']( start_user_id=self.user_id_start, end_user_id=self.user_id_start + 5000 ).unwrap() self.shadow_user_ids = data def get_random_user_id(self): # 随机获取用户ID return random.choice(self.shadow_user_ids) # while True: # index = randint(0, 5000) # user_id = self.user_id_start + index # data = rpc_invoker['venus/community/user/is_shadow'](user_id=user_id).unwrap() # if not data: # continue # ret = data.get('user_id') # if ret: # return user_id def get_user_id(self, id_, platform): # 获取用户ID 缓存记录保留用户关系 cache_key = 'grap:{}:{}'.format(platform, id_) # exist_key = 'grap:{}:{}' value = ins_cache.get(cache_key) # user_id = None if not value: # while True: user_id = self.get_random_user_id() # exist = exist_key.format(platform, user_id) # if not ins_cache.get(exist): # ins_cache.set(exist, id_) # self.del_cache_keys.append(exist) # break # ins_cache.set(cache_key, user_id) # self.del_cache_keys.append(exist) else: user_id = int(value) return user_id def get_json_data_from_dir(self, is_topic=None, is_pictorial=None): # 获取目录文件数据 ret = [] if is_topic: file_path = FILE_PATH + 'topic/' if is_pictorial: file_path = FILE_PATH + 'pictorial/' filenames = [] for _, _, names in os.walk(file_path): filenames = names for filename in filenames: ret.append(self.get_file_json_data(file_path + filename)) return ret def get_file_json_data(self, file): # 获取文件数据 data = None with open(file, 'r') as f: content = f.read() if content.startswith(u'\ufeff'): content = content.encode('utf8')[3:].decode('utf8') data = json.loads(content) return data def get_image_size(self, image_url): # 获取图片宽高 try: url = image_url + IMAGE_SUFFIX + '?imageInfo' response = requests.request("GET", url, timeout=3) info = response.json() return info.get('width'), info.get('height') except Exception as e: logging_exception() return None, None def image_info(self, urls): # 获取图片信息 ret = [] for url in urls: image_url = upload_image(url) while not image_url: image_url = upload_image(url) width, height = self.get_image_size(image_url) while not width and not height: width, height = self.get_image_size(image_url) ret.append( { 'url': image_url.replace('http://alpha.gmeiapp.com/', ''), 'height': height, 'width': width, } ) return ret def revise_comments(self, comment): ret = [] comment['content'] = comment.get('comment') reply = comment.pop('reply', None) if not reply: return comment, ret for info in reply: info['reply_id'] = comment.get('id') info['type'] = comment.get('type') ret.append(info) return comment, ret def create_comment(self, comment, platform): ret = rpc_invoker['venus/community/crawl/replys'](data=[comment], platform=platform, topic_id=comment.get('topic_id'), pictorial_id=comment.get('pictorial_id')).unwrap() if not ret: self.second_reply_error_create.append(comment) def create_topic(self, topics, platform): count = 0 for topic in topics: count += 1 topic_comments = topic.pop('comments', None) images = topic.pop('image') topic['images'] = self.image_info(images) topic['user_id'] = self.get_user_id(id_=topic.get('id'), platform=platform) topic.pop('user') print('-------- topic current count: ', count) topic_obj = rpc_invoker['venus/community/crawl/topic'](data=topic, platform=platform, pictorial_id=None).unwrap() if not topic_obj: self.create_faild_topic_list.append(topic.get('id')) continue print('-------- return topic info: ', topic_obj) if not topic_comments: continue if platform == GRAP_PLATFORM.XIAOHONGSHU: for topic_comment in topic_comments: topic_comment['topic_id'] = topic_obj.get('id') top_comment, comments = self.revise_comments(topic_comment) top_comment['user_id'] = self.get_user_id(id_=top_comment.get('user').get('id'), platform=platform) top_comment.pop('user') ret = rpc_invoker['venus/community/crawl/replys'](data=[top_comment], platform=platform, topic_id=topic_obj.get('id'), pictorial_id=None).unwrap() if not ret: self.top_comment_error_create.append(top_comment) continue if not comments: continue top_id = ret.get('reply_ids')[0] for obj in comments: obj['user_id'] = self.get_user_id(id_=obj.get('user').get('id'), platform=platform) obj['top_id'] = top_id obj.pop('user') # rpc_invoker['venus/community/crawl/replys'](data=comments, platform=platform, topic_id=topic_id, pictorial_id=pictorial_id, top_id=top_id).unwrap() self.second_topic_comments.extend(comments) # print('-------- topic_comments:', self.second_topic_comments) # print('-------- top_comment_error_create:', self.top_comment_error_create) self.del_cache() return None, None def create_pictorial(self, pictorial, platform): topics = [] if not pictorial: return None, None pictorial_comments = pictorial.pop('comments', None) images = self.image_info(pictorial.pop('image')) index = 0 for obj in images: index += 1 topics.append({ 'id': pictorial.get('id') + str(index), 'content': pictorial.get('content'), 'images': [obj], 'create_time': pictorial.get('create_time'), 'user_id': self.get_user_id(id_=obj.get('url'), platform=platform) }) pictorial['user_id'] = self.get_user_id(id_=pictorial.get('id'), platform=platform) pictorial['description'] = pictorial.get('content') # 榜单名称取爬取内容的前20字符 index_end = 20 if len(pictorial.get('content')) < index_end: index_end = len(pictorial.get('content')) - 1 pictorial['name'] = pictorial.get('content')[:index_end] pictorial_obj = rpc_invoker['venus/community/crawl/pictorial'](data=pictorial, platform=platform).unwrap() if not pictorial_obj: self.create_faild_pictorial_list.append(pictorial) pictorial_id = pictorial_obj.get('id') if topics: for obj in topics: rpc_invoker['venus/community/crawl/topic'](data=obj, platform=platform, pictorial_id=pictorial_id).unwrap() if pictorial_comments: if platform == GRAP_PLATFORM.XIAOHONGSHU: for pictorial_comment in pictorial_comments: pictorial_comment['pictorial_id'] = pictorial_id top_comment, comments = self.revise_comments(pictorial_comment) top_comment['user_id'] = self.get_user_id(id_=top_comment.get('user').get('id'), platform=platform) top_comment.pop('user') ret = rpc_invoker['venus/community/crawl/replys'](data=[top_comment], platform=platform, pictorial_id=pictorial_id).unwrap() if not ret: self.top_comment_error_create.append(top_comment) continue if not comments: continue top_id = ret.get('reply_ids')[0] for obj in comments: obj['user_id'] = self.get_user_id(id_=obj.get('user').get('id'), platform=platform) obj['top_id'] = top_id obj.pop('user') # rpc_invoker['venus/community/crawl/replys'](data=comments, platform=platform, topic_id=topic_id, pictorial_id=pictorial_id, top_id=top_id).unwrap() self.second_pictorial_comments.extend(comments) return None, None def handle(self, *args, **options): self.load_shadow_users() platform = 4 # 帖子 print('----- start deal topic at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) topic_data = self.get_json_data_from_dir(is_topic=1) self.create_topic(topics=topic_data, platform=platform) print('-------- create_faild_topic_list:', len(self.create_faild_topic_list)) print('-------- second_topic_comments:', len(self.second_topic_comments)) print('----- end deal topic at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) # 榜单 print('----- start deal pictorial at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) pictorial_data = self.get_json_data_from_dir(is_pictorial=1) count = 0 for pictorial in pictorial_data: count += 1 print('------- current pictorial count :', count) self.create_pictorial(pictorial=pictorial, platform=platform) print('-------- create_faild_pictorial_list:', len(self.create_faild_pictorial_list)) print('-------- top_comment_error_create:', len(self.top_comment_error_create)) print('----- end deal pictorial at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) # 二级评论 print('----- start deal second topic reply at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) count = 0 for topic_comment in self.second_topic_comments[:10]: count += 1 print('------- current second topic reply count :', count) self.create_comment(comment=topic_comment, platform=platform) print('-------- second_reply_error_create:', len(self.second_reply_error_create)) print('----- end deal second topic reply at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) if len(self.top_comment_error_create) > 0: print('-------- second_topic_comments:', len(self.second_topic_comments)) if len(self.create_faild_topic_list) > 0: print('-------- create_faild_topic_list:', len(self.create_faild_topic_list)) if len(self.create_faild_pictorial_list) > 0: print('-------- create_faild_pictorial_list:', len(self.create_faild_pictorial_list)) self.del_cache()