# -*- coding:UTF-8 -*- # @Time : 2020/7/28 8:53 # @File : cal_ni_and_put_to_backend.py # @email : litao@igengmei.com # @author : litao import redis import json import datetime import time import sys import six from crawler.maintenance.func_send_email_with_file import send_file_email from typing import Dict, List from crawler.gm_upload.gm_upload import upload, upload_file import requests import os import copy import re # import HTMLParser import pymysql from crawler.crawler_sys.utils.output_results import retry_get_url from lxml import html from lxml.html.clean import Cleaner import random # from mistune import Renderer, InlineGrammar, InlineLexer, Markdown, escape rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12') # rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12') # conn = pymysql.connect(host='bj-cdb-6slgqwlc.sql.tencentcdb.com', port=62120, user='work', passwd='Gengmei1', # db='mimas_test', charset='utf8') # rds.scan() class push_rule(object): def __init__(self, repost_count_ni=None, comment_count_ni=None, favorite_count_ni=None, time_range=5, level=3): """ 传入增量计算规则 如 5分钟点赞量增长200 faverite_count_ni = 200 time_range = 5 :param repost_count_ni: Int 转发增长值 :param comment_count_ni: Int 评论增长值 :param favorite_count_ni: Int 点赞增长值 :param time_range: Int 间隔分钟 """ self._repost_count_ni = repost_count_ni self._comment_count_ni = comment_count_ni self._favorite_count_ni = favorite_count_ni self._time_range = time_range self.level = level try: self.repost_per_min = self._repost_count_ni / time_range except: self.repost_per_min = -100 try: self.comment_per_min = self._comment_count_ni / time_range except: self.comment_per_min = -100 try: self.favorite_per_min = self._favorite_count_ni / time_range except: self.favorite_per_min = -100 def parse_data(self, fetch_time_last=None, repost_count_last=None, comment_count_last=None, favorite_count_last=None, fetch_time=None, repost_count=None, comment_count=None, favorite_count=None, parse_mode="and") -> bool: """ :param fetch_time_last: :param repost_count_last: :param comment_count_last: :param favorite_count_last: :param fetch_time: :param repost_count: :param comment_count: :param favoratie_count: :param parse_mode: str "and" or "or" 用于判断条件是同时满足还是满足任一条件 :return: """ if fetch_time_last and fetch_time: time_diff = (fetch_time_last - fetch_time) / 60 / 1e3 else: raise KeyError("time input error") if isinstance(comment_count_last, int) and isinstance(comment_count, int): comment_diff = comment_count_last - comment_count else: comment_diff = -100 if isinstance(favorite_count_last, int) and isinstance(favorite_count, int): favoratie_diff = favorite_count_last - favorite_count else: favoratie_diff = -100 if isinstance(repost_count_last, int) and isinstance(repost_count, int): repost_diff = repost_count_last - repost_count else: repost_diff = -100 print(datetime.datetime.now().timestamp(),fetch_time_last,fetch_time,comment_diff,favoratie_diff,repost_diff) if parse_mode == "and": if comment_diff / time_diff >= self.comment_per_min and favoratie_diff / time_diff >= self.favorite_per_min and repost_diff / time_diff >= self.repost_per_min: return True else: return False elif parse_mode == "or": if comment_diff / time_diff >= self.comment_per_min or favoratie_diff / time_diff >= self.favorite_per_min or repost_diff / time_diff >= self.repost_per_min: return True else: return False else: return False def scan_from_redis(push_rule_class_list) -> Dict: # len_id_list = rds.llen("doc_id") while True: set_name = "exists_doc_id_set_%s" % datetime.datetime.now().strftime("%Y-%m-%d") rds.sadd(set_name, "test") rds.expire(set_name, 259200) doc_id = rds.lpop("doc_id") if doc_id: count_res = rds.llen(doc_id) if count_res < 2: continue res_list = rds.lrange(doc_id, 0, 1) fetch_time = 0 repost_count = 0 comment_count = 0 favorite_count = 0 for count, res in enumerate(res_list): # print(res) out_ts = datetime.datetime.now().timestamp() * 1e3 - 86400000 one_data = json.loads(res) print(one_data.get('doc_id')) if count == 0: fetch_time = one_data.get("fetch_time") repost_count = one_data.get("repost_count") comment_count = one_data.get("comment_count") favorite_count = one_data.get("favorite_count") continue if one_data.get("article_type") != "article": continue for push_bool in push_rule_class_list: bool_res = push_bool.parse_data(fetch_time_last=fetch_time, repost_count_last=repost_count, comment_count_last=comment_count, favorite_count_last=favorite_count, comment_count=one_data.get("comment_count"), favorite_count=one_data.get("favorite_count"), repost_count=one_data.get("repost_count"), parse_mode="and", fetch_time=one_data.get("fetch_time")) # print(bool_res) if bool_res: one_data["level"] = push_bool.level if one_data["release_time"] < out_ts: continue set_name = "exists_doc_id_set_%s" % datetime.datetime.fromtimestamp( one_data["release_time"] / 1e3).strftime("%Y-%m-%d") if rds.sismember(set_name, one_data["doc_id"]): continue else: rds.sadd(set_name, one_data["doc_id"]) yield one_data continue # print(res_list) # else: # time.sleep(1) WHITE_TAGS = { "basic": ["div", "p", "span", "img", "br", "video", 'a'], # 暂定小程序及爬取数据使用 "all": [ "div", "p", "span", "img", "br", "video", "audio", "a", "b", "strong", "i", "ul", "ol", "li", "em", "h1", "h2", "h3", "h4", "h5", "h6", "iframe", ] # 可以展示的所有白标签 } # def _get_rich_text(rich_text): # """ # 富文本标签转成标签 # :param rich_text: # :return: # """ # try: # h = HTMLParser.HTMLParser() # rich_text = h.unescape(rich_text.decode("utf-8").replace("&", "&").replace("\n", "<br>")) # 富文本标签转成标签对象 # return rich_text # except: # return rich_text def gm_convert_html_tags(rich_text, all_tags=False, remove_tags=None): """ 富文本内容重新清洗,剔除不需要的样式 :param rich_text: 富文本 :param all_tags: 是否需要匹配所有白名单中的标签 :param remove_tags: 需要剔除的,白名单标签 [] :return: """ if not rich_text: return "" # rich_text = _get_rich_text(rich_text) # 标签清洗 + 补齐 参数 tags = WHITE_TAGS["all"] if all_tags else WHITE_TAGS["basic"] if remove_tags: tags = [tag for tag in tags if tag not in remove_tags] kw = { "remove_unknown_tags": False, "allow_tags": tags, "safe_attrs": ["src", ], } if "a" in tags: kw["safe_attrs"].append("href") elif all_tags: kw["safe_attrs"].extend(["class", "style"]) if "iframe" in kw["allow_tags"]: kw["embedded"] = False clear = Cleaner(**kw) rich_text = clear.clean_html(rich_text) # 增加样式 element_obj = html.fromstring(rich_text) for element in element_obj.xpath(u"//img|//video"): if not all_tags: # 小程序,普通用户,爬取数据 element.attrib["width"] = "100%" # 图片、视频增加宽度 100% if element.tag == "video" and all_tags: element.attrib["class"] = "js_richtext_video" # 移除a标签中跳转链不是gengmei开头的链接 for item in element_obj.xpath("//a[not(starts-with(@href, 'gengmei://'))]"): item.getparent().remove(item) # a 标签追加样式 for item in element_obj.xpath("//a"): item.attrib["style"] = 'color:#3FB5AF' # a标签颜色 rich_text = html.tostring(element_obj, encoding="unicode") return rich_text def push_data_to_user(res_data: Dict) -> Dict: """ 处理数据为可以入库的格式 :param res_data: :return: """ qiniu_img_list = [] if res_data["img_list"]: for img_url in res_data["img_list"]: try: img_wb = retry_get_url(img_url).content res = upload(img_wb,img_type=99) print(res) img_info = retry_get_url(res + "-imageinfo") img_info_json = img_info.json() qiniu_img_list.append((res + "-w", img_info_json)) except Exception as e: print("down load img error %s" % e) return {} # 替换图片 if res_data["platform"] == "weibo": res_data["qiniu_img_list"] = qiniu_img_list try: if qiniu_img_list: for img_url in qiniu_img_list: res_data["title"] = res_data["title"] + '<img src="%s">' % img_url[0] except Exception as e: print("line 280 error %s" % e) if "http://t.cn/" in res_data["title"]: res_data["title"] = res_data["title"].split("http://t.cn/")[0] res_data["content"] = res_data["title"] elif res_data["platform"] == "douban": content = res_data.get("content") if content: for count, img_url in enumerate(res_data["img_list"]): # print(qiniu_img_list[count][0]) content = content.replace(img_url, qiniu_img_list[count][0]) res_data["qiniu_img_list"] = qiniu_img_list res_data["content"] = content if res_data["platform"] == "weibo": res_data["content"] = gm_convert_html_tags(res_data["title"], all_tags=True) res_data["title"] = "" elif res_data["platform"] == "douban": res_data["content"] = gm_convert_html_tags(res_data["content"], all_tags=True) return res_data user_id_list = [33524704, 33524711, 33524716, 33524731, 33524740, 33524697, 33524707, 33524712, 33524717, 33524724, 33524755, 33524762, 33524779, 33524766, 33524782] img_type = { "OTHER": 1, # '其他图片' "GIF": 2, # "GIF动图") "JPG": 3, # "JPG图片") "JPEG": 4, # "JPEG图片") "PNG": 5, # "PNG图片") "BMP": 6, # "BMP位图") "WEBP": 7, # "WEBP图片类型") "TIFF": 8, # "TIFF图片类型") } def write_data_into_mysql(res_data): conn = pymysql.connect(host='172.16.30.138', port=3306, user='mimas', passwd='GJL3UJe1Ck9ggL6aKnZCq4cRvM', db='mimas_prod', charset='utf8mb4') cur = conn.cursor() now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 清洗数据为可以入库的格式 data = push_data_to_user(res_data) if not data.get("content"): return None if not data.get("qiniu_img_list"): return None tractate_id = 0 if data["platform"] == "weibo": platform_value = 11 elif data["platform"] == "douban": platform_value = 12 try: sql_query = """insert into api_tractate (user_id,content,is_online,status,platform,content_level,is_excellent,create_time,last_modified,user_del,low_quality,low_quality_deal,platform_id,pgc_type,title) values ({user_id},'{content}',{is_online},{status},{platform},{content_level},{is_excellent},'{create_time}','{last_modified}',{user_del},{low_quality},{low_quality_deal},'{platform_id}',{pgc_type},'{title}');""".format( user_id=random.choice(user_id_list), content=data["content"], is_online=0, status=2, platform=platform_value, content_level=data["level"], is_excellent=0, create_time=now_str, last_modified=now_str, user_del=0, low_quality=0, low_quality_deal=0, platform_id=data["doc_id"], pgc_type=0, title=data["title"]) res = cur.execute(sql_query) tractate_id = int(conn.insert_id()) if res: conn.commit() except Exception as e: print("commit error %s" % e) print(data) conn.rollback() if data.get("qiniu_img_list"): for img_info in data.get("qiniu_img_list"): if img_info[0] in data.get("content"): image_url_source = 2 else: image_url_source = 3 try: image_type = img_type.get(img_info[1]["format"].upper()) except: image_type = 1 try: width = img_info[1]["width"] height = img_info[1]["height"] except: width = 0 height = 0 try: if img_type == 7: sql_query = """ insert into api_tractate_images (tractate_id,image_url,width,image_webp,height,image_url_source,image_type,image_webp,create_time,update_time) values ({tractate_id},'{image_url}',{width},{height},{image_webp},{image_url_source},{image_type},{image_webp},'{create_time}','{update_time}') """.format(tractate_id=tractate_id, image_url=img_info[0], width=width, height=height, image_url_source=image_url_source, image_type=image_type, image_webp=img_info[0], create_time=now_str, update_time=now_str) else: sql_query = """ insert into api_tractate_images (tractate_id,image_url,width,height,image_url_source,image_type,create_time,update_time) values ({tractate_id},'{image_url}',{width},{height},{image_url_source},{image_type},'{create_time}','{update_time}') """.format(tractate_id=tractate_id, image_url=img_info[0], width=width, height=height, image_url_source=image_url_source, image_type=image_type, create_time=now_str, update_time=now_str) res = cur.execute(sql_query) if res: conn.commit() except Exception as e: print("commit error %s" % e) conn.rollback() cur.close() conn.close() if tractate_id: return tractate_id else: return None class EnumMeta(type): """Metaclass for Enum""" @staticmethod def _find_allow_types_(cls, bases): all_types = set(six.integer_types) | {six.text_type, str} allow_types = set() if Enum is None: # Enum base class assert cls == 'Enum' return tuple(all_types) else: for base in bases: if not issubclass(base, Enum): allow_types.add(base) if allow_types: return tuple(all_types & allow_types) else: return tuple(all_types) class Enum(six.with_metaclass(EnumMeta, object)): """Generic enumeration. Derive from this class to define new enumerations. """ def __repr__(self): return "<%s.%s: %r>" % ( self.__class__.__name__, self._name_, self._value_) def __str__(self): if self._desc_: return "%s.%s(%s)" % (self.__class__.__name__, self._name_, self._desc_) else: return "%s.%s" % (self.__class__.__name__, self._name_) def __hash__(self): return hash(self._name_) class TRACTATE_PLATFORM(Enum): """ 新帖子发布来源 """ GM = ("1", u"更美") HERA = ("2", u"HERA后台") DOCTOR = ("3", u"医生端") XIAOHONGSHU = ("4", u"小红书") WEIBO = ("5", u"微博") SOYOUNG = ("6", u"新氧") MARK = ("7", u"站内打卡活动") VARIETY_SHOW_YOUNG = ("8", "选秀节目(少年之名)打榜活动") GROUP_DETAIL = ("9", "普通小组") GROUP_TOPIC_DETAIL = ("10", "普通小组话题") STRATEGY_WEIBO_HOTSPOT = ("11", "策略微博热点") STRATEGY_DOUBAN_HOTSPOT = ("12", "策略豆瓣鹅组热点") STRATEGY_TOUTIAO = ("13", "策略头条文章") STRATEGY_ZHIHU = ("14", "策略知乎文章") STRATEGY_XIAOHONGSHU = ("15", "策略小红书文章") STRATEGY_SOYOUNG = ("16", "策略新氧文章") STRATEGY_WEIBO = ("17", "策略微博文章") def task_main(): # 实例化数据判断规则 注意高优先级在前 低优先级在后 push_rule_class1 = push_rule(favorite_count_ni=500,comment_count_ni=450, time_range=10, level=4) push_rule_class2 = push_rule(favorite_count_ni=400,comment_count_ni=350, time_range=10, level=3) # push_rule_class3 = push_rule(comment_count_ni=1, time_range=10, level=3) rules_list = [ push_rule_class1, push_rule_class2, # push_rule_class3 ] # 循环处理抓取数据,返回需要添加至后端的数据 for res_data in scan_from_redis(rules_list): # 符合规则入库数据 try: print("start to write to es",res_data) tractate_id = write_data_into_mysql(res_data) # tractate_id = "test" print("down to write to es",tractate_id,res_data) if res_data["level"] >= 3 and tractate_id: content = res_data.get("content") if res_data.get("content") else res_data.get("title") title_str = res_data["platform"] + "帖子内容审核" body_str = """ 问好: 有一篇新的{level}星内容需要审核,帖子号为{tractate_id},发布者为{releaser} 内容如下: {content} """.format(tractate_id=tractate_id, content=content,level=res_data["level"],releaser=res_data["releaser"]) send_file_email("", "", email_group=[ # "<litao@igengmei.com>" "<liulingling@igengmei.com>", "<liujinhuan@igengmei.com>", "<hongxu@igengmei.com>", "<yangjiayue@igengmei.com>", "<zhangweiwei@igengmei.com>","<liuyiting@igengmei.com>", "<cp-sunyinghe@igengmei.com>", ], cc_group=["<duanyingrong@igengmei.com>","<litao@igengmei.com>"], email_msg_body_str=body_str, title_str=title_str) print("send to mysql") except Exception as e: print("send email error %s"% e) # test = {'release_time': 1595952037000, 'fetch_time': 1596012816514, 'url': 'https://www.douban.com/group/topic/186707979/', 'releaser': '🍫', 'repost_count': 40, 'comment_count': 411, 'favorite_count': 144, 'title': '王俊凯终于还是举铁了', 'releaserUrl': 'https://www.douban.com/people/57762442', 'releaser_id_str': 'douban_57762442', 'video_img': 'https://img3.doubanio.com/view/group_topic/sqxs/public/p317684082.webp', 'mid': '186707979', 'platform': 'douban', 'doc_id': 'douban_186707979', 'content': '<div id=\'content\'><div class="image-container image-float-center"><div class="image-wrapper"><img src="https://img3.doubanio.com/view/group_topic/l/public/p317684082.webp" width="500"/></div></div><div class="image-container image-float-center"><div class="image-wrapper"><img src="https://img9.doubanio.com/view/group_topic/l/public/p317684064.webp" width="500"/></div></div><div class="image-container image-float-center"><div class="image-wrapper"><img src="https://img3.doubanio.com/view/group_topic/l/public/p317684093.webp" width="500"/></div></div><div class="image-container image-float-center"><div class="image-wrapper"><img src="https://img9.doubanio.com/view/group_topic/l/public/p317684095.webp" width="500"/></div></div><div class="image-container image-float-center"><div class="image-wrapper"><img src="https://img3.doubanio.com/view/group_topic/l/public/p317684052.webp" width="500"/></div></div><p></p></div>', 'collection_count': 107, 'img_list': ['https://img3.doubanio.com/view/group_topic/l/public/p317684082.webp', 'https://img9.doubanio.com/view/group_topic/l/public/p317684064.webp', 'https://img3.doubanio.com/view/group_topic/l/public/p317684093.webp', 'https://img9.doubanio.com/view/group_topic/l/public/p317684095.webp', 'https://img3.doubanio.com/view/group_topic/l/public/p317684052.webp'], 'level': 5} # write_data_into_mysql(test) if __name__ == "__main__": # from concurrent.futures import ProcessPoolExecutor # executor = ProcessPoolExecutor(max_workers=5) # futures = [] # for processe in range(4): # future = executor.submit(task_main) # futures.append(future) # print('Processe %s start' % processe) # executor.shutdown(True) task_main()