# 读取json文件 调用api 将数据导入库 import os from datetime import datetime import json import requests from datetime import datetime from random import randint from django.core.management import BaseCommand from talos.cache.base import crawl_user_cache from gm_types.mimas import TRACTATE_PLATFORM from talos.logger import info_logger, exception_logger import requests from gm_upload import upload, upload_file from gm_upload import IMG_TYPE from talos.models.tractate import Tractate, TractateImages, TractateReply, TractateExtra from talos.services.user import UserService FILE_PATH = '/Users/haowei/Desktop/gm/' TRACTATE_YEAR = 2019 def upload_image(url, img_type=IMG_TYPE.TOPIC): '''非站内图片处理''' try: response = requests.get(url) return upload(response.content, img_type=img_type) except: return None class Command(BaseCommand): """ 评论爬取 """ user_id_start = 20893372 del_cache_keys = [] create_faild_topic_list = [] top_comment_error_create = [] second_topic_comments = [] second_topic_error_comments = [] second_reply_error_create = [] insert_topic_ids = [] def del_cache(self): for obj in self.del_cache_keys: crawl_user_cache.delete(obj) def get_random_user_id(self): # 随机获取用户ID while True: index = randint(0, 2000) user_id = self.user_id_start + index data = UserService.get_user_by_user_id(user_id) if not data: continue if data.id == user_id: return user_id def get_user_id(self, id_, platform): # 获取用户ID 缓存记录保留用户关系 cache_key = 'grap:{}:{}'.format(platform, id_) exist_key = 'grap:{}:{}' value = crawl_user_cache.get(cache_key) user_id = None if not value: while True: user_id = self.get_random_user_id() exist = exist_key.format(platform, user_id) if not crawl_user_cache.get(exist): crawl_user_cache.set(exist, id_) self.del_cache_keys.append(exist) break crawl_user_cache.set(cache_key, user_id) self.del_cache_keys.append(exist) else: user_id = int(value) return user_id def get_json_data_from_dir(self, is_topic=None, is_pictorial=None): # 获取目录文件数据 ret = [] if is_topic: # file_path = FILE_PATH + 'topic/' file_path = FILE_PATH + 'xiaohongshuyiqi/' if is_pictorial: file_path = FILE_PATH + 'pictorial/' filenames = [] for root, dirs, names in os.walk(file_path): if not dirs: for item in names: filenames.append(root + '/' + item) for filename in filenames: ret.append(self.get_file_json_data(filename)) return ret def get_file_json_data(self, file): # 获取文件数据 data = None with open(file, 'r') as f: content = f.read() if content.startswith(u'\ufeff'): content = content.encode('utf8')[3:].decode('utf8') data = json.loads(content) return data def get_image_size(self, image_url): # 获取图片宽高 try: url = image_url + '-imageinfo' response = requests.request("GET", url) info = response.json() return info.get('width'), info.get('height') except Exception as e: exception_logger.error(e) return None, None def image_info(self, urls): # 获取图片信息 ret = [] for url in urls: image_url = upload_image(url) while not image_url: image_url = upload_image(url) width, height = self.get_image_size(image_url) while not width and not height: width, height = self.get_image_size(image_url) ret.append( { 'url': image_url.replace('http://alpha.gmeiapp.com/', ''), 'height': height, 'width': width, } ) return ret def revise_comments(self, comment): ret = [] comment['content'] = comment.get('comment') comment['replied_id'] = 0 reply = comment.pop('reply', None) if not reply: return comment, ret for info in reply: info['reply_id'] = comment.get('id') info['type'] = comment.get('type') ret.append(info) return comment, ret def create_topic(self, topics, platform): count = 0 for topic in topics[1:2]: topic_comments = topic.pop('comments', None) topic_exist = Tractate.objects.filter(platform=platform, platform_id=topic.get('id')).first() if not topic_exist: count += 1 images = topic.pop('image') topic['images'] = self.image_info(images) topic['user_id'] = self.get_user_id(id_=topic.get('id'), platform=platform) topic.pop('user') print('-------- topic current count: ', count) topic_id = self.topic_create(data=topic, platform=platform) if not topic_id: self.create_faild_topic_list.append(topic.get('id')) continue else: topic_id = topic_exist.id self.insert_topic_ids.append(topic_id) print('-------- return topic info: ', topic_id) if not topic_comments: continue if platform == TRACTATE_PLATFORM.XIAOHONGSHU: for topic_comment in topic_comments: topic_comment['topic_id'] = topic_id top_comment, comments = self.revise_comments(topic_comment) top_comment['user_id'] = self.get_user_id(id_=top_comment.get('user').get('id'), platform=platform) top_comment.pop('user') top_id = self.comment_create(data=top_comment, platform=platform, topic_id=topic_id, top_id=0) if not top_id: self.top_comment_error_create.append(top_comment) continue if not comments: continue for obj in comments: obj['user_id'] = self.get_user_id(id_=obj.get('user').get('id'), platform=platform) obj['top_id'] = top_id obj.pop('user') self.second_topic_comments.extend(comments) self.del_cache() return None, None def topic_create(self, data, platform): obj = Tractate.objects.filter(platform=platform, platform_id=data.get('id')).first() if obj: return obj.id obj = Tractate() obj.user_id = data.get('user_id') obj.content = data.get('content') obj.platform = platform obj.platform_id = data.get('id') obj.is_online = False obj.save() create_time = datetime.fromtimestamp(data.get('create_time')) create_time.replace(year=TRACTATE_YEAR) Tractate.objects.filter(platform=platform, platform_id=data.get('id')).update(create_time=create_time) images = data.get('images') image_list = [] for image in images: image_list.append(TractateImages( tractate_id=obj.id, image_url=image.get('url'), width=image.get('width'), height=image.get('height'), )) TractateImages.objects.bulk_create(image_list) return obj.id def comment_create(self, data, platform, topic_id, top_id=0): if not data: return create_time = datetime.fromtimestamp(data.get('create_time')) obj = TractateReply.objects.filter(platform=platform, platform_id=data.get('id')).first() if obj: return obj.id reply_id = data.get('reply_id') replied = None if reply_id: replied = TractateReply.objects.filter(platform=platform, platform_id=reply_id).first() TractateReply.objects.create( tractate_id=topic_id, top_id=replied.top_id if replied else top_id, platform=platform, platform_id=data.get('id'), content=data.get('content'), user_id=data.get('user_id'), replied_id=replied.id if replied else data.get('replied_id'), create_time=create_time, is_online=False, ) obj = TractateReply.objects.filter(platform=platform, platform_id=data.get('id')).first() return obj.id def second_comment_create(self, comment, platform): if not comment: return obj = TractateReply.objects.filter(platform=platform, platform_id=comment.get('id')).first() if obj: return reply_id = comment.get('reply_id') replied = TractateReply.objects.filter(platform=platform, platform_id=reply_id).first() create_time = datetime.fromtimestamp(comment.get('create_time')) TractateReply.objects.create( tractate_id=replied.tractate_id, replied_id=replied.id, content=comment.get('content'), top_id=replied.id if replied else comment.get('top_id'), user_id=comment.get('user_id'), create_time=create_time, platform=platform, platform_id=comment.get('id'), is_online=False, ) def update_reply_online(self, topic_ids): for topic_id in topic_ids: reply_ids = list(TractateReply.objects.filter(tractate_id=topic_id).order_by('create_time').values_list('id', flat=True)[:50]) TractateReply.objects.filter(id__in=reply_ids).update(is_online=True) def handle(self, *args, **options): platform = TRACTATE_PLATFORM.XIAOHONGSHU # 帖子 print('----- start deal topic at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) topic_data = self.get_json_data_from_dir(is_topic=1) self.create_topic(topics=topic_data, platform=platform) print('-------- create_faild_topic_list:', len(self.create_faild_topic_list)) print('-------- second_topic_comments:', len(self.second_topic_comments)) print('----- end deal topic at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) # 二级评论 print('----- start deal second topic reply at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) count = 0 for topic_comment in self.second_topic_comments: count += 1 print('------- current second topic reply count :', count) self.second_comment_create(comment=topic_comment, platform=platform) print('-------- second_reply_error_create:', len(self.second_reply_error_create)) print('----- end deal second topic reply at {} -----'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))) if len(self.top_comment_error_create) > 0: print('-------- second_topic_comments:', len(self.second_topic_comments)) if len(self.create_faild_topic_list) > 0: print('-------- create_faild_topic_list:', len(self.create_faild_topic_list)) self.del_cache() self.update_reply_online(topic_ids=self.insert_topic_ids)