# -*- coding:UTF-8 -*- # @Time : 2020/8/11 14:33 # @File : crawler_week_report.py # @email : litao@igengmei.com # @author : litao # coding=utf-8 import json import pymysql import xlwt, datetime import redis # from pyhive import hive from maintenance.func_send_email_with_file import send_file_email from typing import Dict, List from elasticsearch_7 import Elasticsearch from elasticsearch_7.helpers import scan import sys import time from pyspark import SparkConf from pyspark.sql import SparkSession, DataFrame from pyspark.sql.functions import lit import pytispark.pytispark as pti startTime = time.time() sparkConf = SparkConf() sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.tispark.plan.allow_index_double_read", False) sparkConf.set("spark.tispark.plan.allow_index_read", True) sparkConf.set("spark.hive.mapred.supports.subdirectories", True) sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("mapreduce.output.fileoutputformat.compress", False) sparkConf.set("mapreduce.map.output.compress", False) sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true") sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true") sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true") sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") sparkConf.set("prod.jerry.jdbcuri", "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") sparkConf.set("prod.tidb.database", "jerry_prod") spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") .config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName("LR PYSPARK TEST").enableHiveSupport().getOrCreate()) spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12') conn_mimas = pymysql.connect(host='172.16.30.138', port=3306, user='mimas', passwd='GJL3UJe1Ck9ggL6aKnZCq4cRvM', db='mimas_prod', charset='utf8mb4') cur_mimas = conn_mimas.cursor() db_zhengxing_eagle = pymysql.connect(host="172.16.30.136", port=3306, user="doris", password="o5gbA27hXHHm", db="doris_prod", charset='utf8', cursorclass=pymysql.cursors.DictCursor) zhengxing_cursor = db_zhengxing_eagle.cursor() es = Elasticsearch([ { 'host': '172.16.31.17', 'port': 9200, }, { 'host': '172.16.31.11', 'port': 9200, }]) # class Hive_conn(): # def __init__(self): # self._hive_host = "bj-gm-prod-cos-datacenter003" # self._hive_port = 10000 # self._hive_username = "strategy" # self._hive_database = "online" # self._conn = hive.Connection(host=self._hive_host, port=self._hive_port, username=self._hive_username, # database=self._hive_database) # self.cursor = self._conn.cursor() # # self.cursor.execute('show tables') # # def __del__(self): # self.cursor.close() # self._conn.close() # def zipDir(dirpath,outFullName): # """ # 压缩指定文件夹 # :param dirpath: 目标文件夹路径 # :param outFullName: 压缩文件保存路径+xxxx.zip # :return: 无 # """ # import zipfile # zip = zipfile.ZipFile(outFullName,"w",zipfile.ZIP_DEFLATED) # for path,dirnames,filenames in os.walk(dirpath): # # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩 # fpath = path.replace(dirpath,'') # # for filename in filenames: # zip.write(os.path.join(path,filename),os.path.join(fpath,filename)) # zip.close() def send_email_tome(): try: date = datetime.datetime.now().date() - datetime.timedelta(days=1) fromaddr = 'litao@igengmei.com' content = 'hi all:附件为' + str(date) + '的搜索词数据统计结果以及近一周的数据统计结果,请查收!' zipFile = "/srv/apps/crawler/近1周数据统计结果.xls" send_file_email("", "", email_group=["<litao@igengmei.com>","<duanyingrong@igengmei.com>"], title_str=content , email_msg_body_str=content, file=zipFile) except Exception as e: print(e) def get_es_word(word, start_ts): # tractate oneline results = es.search( index='gm-dbmw-tractate-read', doc_type='tractate', timeout='10s', size=0, body={ "query": { "bool": { "minimum_should_match": 1, "should": [{"match_phrase": {"content": {"query": word, "analyzer": "gm_default_index"}}}, { "match_phrase": {"tractate_tag_name": {"query": word, "analyzer": "gm_default_index"}}}, { "match_phrase": {"tractate_tag_name_content": {"query": word, "analyzer": "gm_default_index"}}}], "must": [{"term": {"is_online": True}}, { "terms": {"content_level": [6, 5, 4, 3]} }] } }, "size": 0, "aggs": { "content_level": { "terms": { "field": "content_level", "size": 10 } } } } ) tractate_content_num = results["hits"]["total"] content_level_count = results["aggregations"]["content_level"]["buckets"] # tractate craw results = es.search( index='gm-dbmw-tractate-read', doc_type='tractate', timeout='10s', size=0, body={ "query": { "bool": { "minimum_should_match": 1, "should": [{"match_phrase": {"content": {"query": word, "analyzer": "gm_default_index"}}}, { "match_phrase": {"tractate_tag_name": {"query": word, "analyzer": "gm_default_index"}}}, { "match_phrase": {"tractate_tag_name_content": {"query": word, "analyzer": "gm_default_index"}}}], "must": [ {"terms": {"platform": [13, 14, 15, 16, 17]}}, {"range": {"create_time_epoch": {"gte": start_ts / 1e3}}} ] } }, "size": 0, "aggs": { "content_level": { "terms": { "field": "content_level", "size": 10 }, "aggs": { "is_online": { "terms": { "field": "is_online", "size": 10 } } } } } } ) craw_tractate_num = results["hits"]["total"] craw_level_count = results["aggregations"]["content_level"]["buckets"] res_list = [] # 可见3456星帖子 try: for content_level in ("3", "4", "5", "6"): doc_count = 0 for content_level_dic in content_level_count: if content_level_dic["key"] == content_level: doc_count = content_level_dic["doc_count"] break res_list.append(doc_count) except: print("craw_level_count error", craw_level_count) res_list.append(craw_tractate_num) for content_level in ("3", "4", "5", "6"): doc_count = 0 for content_level_dic in craw_level_count: if content_level_dic["key"] == content_level: for buck in content_level_dic["is_online"]["buckets"]: if buck["key"] == 1: doc_count = buck["doc_count"] break res_list.append(doc_count) return res_list class WritrExcel(): def __init__(self): self.workbook = xlwt.Workbook(encoding='utf-8') def set_style(self, name, height, bold=False): style = xlwt.XFStyle() # 初始化样式 font = xlwt.Font() # 为样式创建字体 font.name = name font.bold = bold font.color_index = 4 font.height = height style.font = font return style # 写入Excel def write_excel(self, sheet_name, rows): # 创建工作簿 # 创建sheet data_sheet = self.workbook.add_sheet(sheet_name) # 将样式定义在循环之外 default = self.set_style('Times New Roman', 220, True) j = k = 0 # 循环读取每一行数据并写入Excel for row in rows[:65530]: for i in range(len(row)): try: # 写入 data_sheet.write((j + k), i, row[i], default) except: print(i) raise # data_sheet.write(1, i, row1[i], self.set_style('Times New Roman', 220, True)) k = k + 1 print("写入sheet成功,共" + str(k) + "行数据") def save_excel(self, path): self.workbook.save(path) def get_search_keywrod_dict(): es_framework = Elasticsearch(hosts='172.16.32.37', port=9200) search_body = { "query": { "bool": { "filter": [ ] } } } return_dict = {} scan_res = scan(es_framework, query=search_body, index="search_word") for res in scan_res: return_dict[res["_source"]["title"]] = {} return return_dict def get_hot_crawler_data_count(index, start_ts): es_framework = Elasticsearch(hosts='172.16.32.37', port=9200) search_body = { "query": { "bool": { "filter": [ {"range": {"fetch_time": {"gte": int(start_ts)}}} ], "must_not": [ {"exists": {"field": "search_word"}} ] } }, "size": 0, "aggs": { "NAME": { "terms": { "field": "platform", "size": 10 } } } } print(search_body) search_res = es_framework.search(body=search_body, index=index) platform_count = search_res["aggregations"]["NAME"]["buckets"] return platform_count def get_count_hot_crawler_data_from_mysql(start_ts: int) -> Dict: data_platform_id_list = [11, 12] res_dict = {} for platform_id in data_platform_id_list: start_date = datetime.datetime.fromtimestamp(start_ts / 1e3) start_date_str = start_date.strftime("%Y-%m-%d %H:%M:%S") match_all_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and platform = {platfrom}""".format( time=start_date_str, platfrom=platform_id) cur_mimas.execute(match_all_sql) data = cur_mimas.fetchall() match_all_num = data[0][0] # print(match_all_num) oneline_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and is_online = 1 and pgc_type = 1""".format( time=start_date_str) cur_mimas.execute(oneline_sql) data = cur_mimas.fetchall() match_oneline_num = data[0][0] # print(match_oneline_num) if platform_id == 11: platform = 'weibo' elif platform_id == 12: platform = "douban" res_dict[platform] = [] res_dict[platform].append(match_all_num) res_dict[platform].append(match_oneline_num) return res_dict def week_num(year=None, cycle=None, cycle_num=None, compare_type=None): now = datetime.datetime.now() now_canlendar = now.isocalendar() if not cycle_num: week_canlendar = now_canlendar else: week_canlendar = (now.year, cycle_num + 1, 0) year = week_canlendar[0] this_week = week_canlendar[1] if this_week == 0: last_year = year - 1 this_week = 1 else: last_year = year if this_week == 1: last_week = "52" else: last_week = this_week - 1 today = datetime.datetime(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day) # today = datetime.datetime(year=2018, month=12, day=25) first_day_in_week = today - datetime.timedelta( days=now_canlendar[2] + 7 * (now_canlendar[1] - week_canlendar[1]) + 2) fisrt_day_ts = int(first_day_in_week.timestamp() * 1e3) last_day_in_week = first_day_in_week + datetime.timedelta(days=7) last_day_ts = int(last_day_in_week.timestamp() * 1e3) this_week_index = 'crawler-data-raw' return this_week_index, fisrt_day_ts, last_day_ts, this_week, last_week, last_year def get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year): res_data = [("周", "抓取平台", "入库内容数", "推送候选内容入库数", "上线内容数", "推送内容数")] craw_data_count = get_hot_crawler_data_count(data_index, start_ts) res_dict = get_count_hot_crawler_data_from_mysql(start_ts) for platform in ("douban", "weibo"): craw_one_week_count = "" for buck in craw_data_count: if buck["key"] == platform: craw_one_week_count = buck["doc_count"] push_num = res_dict[platform][0] oneline_num = res_dict[platform][1] new_line = (week_num, platform, craw_one_week_count, push_num, oneline_num, "") res_data.append(new_line) return res_data def get_keyword_ctr(start_ts,end_ts): start_date = datetime.datetime.fromtimestamp(start_ts/1e3) end_date = datetime.datetime.fromtimestamp(end_ts/1e3) start_date_str = start_date.strftime("%Y%m%d") end_date_str = end_date.strftime("%Y%m%d") data_dic = {} # --query词曝光 baoguang_sql = """ SELECT card_id as query,count(*) as query_count FROM online.ml_community_precise_exposure_detail WHERE partition_date>='{start_date}' AND partition_date<'{end_date}' and page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') group by query """.format(start_date=str(start_date_str),end_date=str(end_date_str)) device_df = spark.sql(baoguang_sql) device_df.show(1, False) sql_res = device_df.collect() print("-----------------------------------------------------------------------------") for res in sql_res: data_dic[res.query] = res.query_count # --query词曝光 query_sql = """ SELECT params['query_words'] as query_words FROM online.bl_hdfs_maidian_updates WHERE partition_date>='{start_date}' AND partition_date<'{end_date}' AND action = 'report_status' AND page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') """.format(start_date=start_date_str,end_date=end_date_str) device_df = spark.sql(query_sql) device_df.show(1, False) sql_res = device_df.collect() for res in sql_res: try: res_json = json.loads(res.query_words) # print(res_json, type(res_json)) except: continue for single_keyword in res_json: data_count = data_dic.get(single_keyword) if data_count: data_dic[single_keyword] = data_dic[single_keyword] + 1 else: data_dic[single_keyword] = 1 print(data_dic) return data_dic def craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year,word_count): tag_names_list_week = [] one_month_ago = datetime.datetime.now().date() - datetime.timedelta(days=30) search_keyword_dict = get_search_keywrod_dict() sql = 'select keywords,sum(sorted) as nums,sum(uv) as uvs from api_search_words where is_delete = 0 and create_time >= "' + str( one_month_ago) + '" group by keywords order by nums desc' print(sql) zhengxing_cursor.execute("set names 'UTF8'") zhengxing_cursor.execute(sql) data = zhengxing_cursor.fetchall() tup_title = ("关键词", "近一周搜索内容ctr", "过去30天搜索pv", "过去30天搜索uv", "帖子-3星内容数量", "帖子-4星内容数量", "帖子-5星内容数量", "帖子-6星内容数量", "近一周抓取数量", "近一周审核上线3星数量", "近一周审核上线4星数量", "近一周审核上线5星数量", "近一周审核上线6星数量") tag_names_list_week.append(tup_title) for name in list(data): word = name.get("keywords", None) if word in search_keyword_dict: sorteds = name.get("nums", 0) uv = name.get("uvs", 0) tractate_content_num = get_es_word(word, start_ts) exposure_count = word_count.get(word) if not exposure_count: ctr = "" else: ctr = str(round(sorteds/exposure_count * 100, 2)) + "%" new_line = [word,ctr, sorteds, uv] tag_names_list_week.append(tuple(new_line + tractate_content_num)) for word in search_keyword_dict: tractate_content_num = get_es_word(word, start_ts) if tractate_content_num[0] != 0: new_line = [word, 0, 0, 0] tag_names_list_week.append(tuple(new_line + tractate_content_num)) return tag_names_list_week if __name__ == "__main__": data_index, start_ts, end_ts, week_num, last_week_num, year = week_num() start_ts = 1597593600000 craw_one_week = get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year) word_count = get_keyword_ctr(start_ts, end_ts) # 一周爬虫抓取热点数 print(craw_one_week) # query 一周抓取详情 all_data_week = craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year,word_count) path = "近1周数据统计结果.xls" exl = WritrExcel() exl.write_excel("热点内容抓取周报", tuple(craw_one_week)) exl.write_excel("query抓取周报", tuple(all_data_week)) exl.save_excel(path) print(u'创建demo.xls文件成功') send_email_tome()