Commit 133175f8 authored by litaolemo's avatar litaolemo

update

parent b5edc529
......@@ -10,17 +10,66 @@ import xlwt, datetime
from maintenance.func_send_email_with_file import send_file_email
import zipfile
import redis
from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from crawler.crawler_sys.utils.trans_qiniu_img import write_data_into_mysql
import sys
import time
from datetime import datetime, date, timedelta
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit
import pytispark.pytispark as pti
import pandas as pd
startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.jerry.jdbcuri", "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName("LR PYSPARK TEST").enableHiveSupport().getOrCreate())
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
trainDays = sys.argv[1]
print('trainDays:{}'.format(trainDays))
# user_tag3_portrait_df = connectTi(spark,'user_tag3_portrait')
# user_tag3_portrait_df.createOrReplaceTempView("user_tag3_portrait")
# user_tag3_portrait_df.show(1,False)
es_framework = Elasticsearch(hosts='172.16.32.37', port=9200)
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
conn_mimas = pymysql.connect(host='172.16.30.138', port=3306, user='mimas', passwd='GJL3UJe1Ck9ggL6aKnZCq4cRvM',
db='mimas_prod', charset='utf8mb4')
db='mimas_prod', charset='utf8mb4')
cur_mimas = conn_mimas.cursor()
db_zhengxing_eagle = pymysql.connect(host="172.16.30.136", port=3306, user="doris",
......@@ -40,6 +89,21 @@ es = Elasticsearch([
}])
# class Hive_conn():
# def __init__(self):
# self._hive_host = "bj-gm-prod-cos-datacenter003"
# self._hive_port = 10000
# self._hive_username = "strategy"
# self._hive_database = "online"
# self._conn = hive.Connection(host=self._hive_host, port=self._hive_port, username=self._hive_username,
# database=self._hive_database)
# self.cursor = self._conn.cursor()
# # self.cursor.execute('show tables')
#
# def __del__(self):
# self.cursor.close()
# self._conn.close()
# def zipDir(dirpath,outFullName):
# """
......@@ -70,7 +134,7 @@ def send_email_tome():
print(e)
def get_es_word(word,start_ts):
def get_es_word(word, start_ts):
# tractate oneline
results = es.search(
index='gm-dbmw-tractate-read',
......@@ -88,15 +152,15 @@ def get_es_word(word,start_ts):
"must": [{"term": {"is_online": True}}, {
"terms":
{"content_level": [6, 5, 4, 3]}
}]
}]
}
},"size": 0,"aggs": {
"content_level": {
}, "size": 0, "aggs": {
"content_level": {
"terms": {
"field": "content_level",
"size": 10
"field": "content_level",
"size": 10
}
}
}
}
}
)
......@@ -118,7 +182,8 @@ def get_es_word(word,start_ts):
"match_phrase": {"tractate_tag_name_content": {"query": word,
"analyzer": "gm_default_index"}}}],
"must": [
{"terms": {"platform": [13,14,15,16,17]}}, {"range":{"create_time_epoch":{"gte":start_ts/1e3}}}
{"terms": {"platform": [13, 14, 15, 16, 17]}},
{"range": {"create_time_epoch": {"gte": start_ts / 1e3}}}
]
}
}, "size": 0, "aggs": {
......@@ -127,14 +192,14 @@ def get_es_word(word,start_ts):
"field": "content_level",
"size": 10
},
"aggs": {
"aggs": {
"is_online": {
"terms": {
"field": "is_online",
"size": 10
}
"terms": {
"field": "is_online",
"size": 10
}
}
}
}
}
}
}
......@@ -144,7 +209,7 @@ def get_es_word(word,start_ts):
res_list = []
# 可见3456星帖子
try:
for content_level in ("3","4","5","6"):
for content_level in ("3", "4", "5", "6"):
doc_count = 0
for content_level_dic in content_level_count:
if content_level_dic["key"] == content_level:
......@@ -152,9 +217,9 @@ def get_es_word(word,start_ts):
break
res_list.append(doc_count)
except:
print("craw_level_count error" ,craw_level_count)
print("craw_level_count error", craw_level_count)
res_list.append(craw_tractate_num)
for content_level in ("3","4","5","6"):
for content_level in ("3", "4", "5", "6"):
doc_count = 0
for content_level_dic in craw_level_count:
if content_level_dic["key"] == content_level:
......@@ -202,62 +267,64 @@ class WritrExcel():
k = k + 1
print("写入sheet成功,共" + str(k) + "行数据")
def save_excel(self,path):
def save_excel(self, path):
self.workbook.save(path)
def get_search_keywrod_dict():
search_body = {
"query": {
"bool": {
"filter": [
"bool": {
"filter": [
]
}
]
}
}
}
return_dict = {}
scan_res = scan(es_framework,query=search_body,index="search_word")
scan_res = scan(es_framework, query=search_body, index="search_word")
for res in scan_res:
return_dict[res["_source"]["title"]] = {}
return return_dict
def get_hot_crawler_data_count(index,start_ts):
def get_hot_crawler_data_count(index, start_ts):
search_body = {
"query": {
"bool": {
"filter": [
{"range":{"fetch_time":{"gte":start_ts}}}
],"must_not": [
{"exists": {"field": "search_word"}}
]
}
},"size": 0,"aggs": {
"NAME": {
"terms": {
"field": "platform",
"size": 10
"bool": {
"filter": [
{"range": {"fetch_time": {"gte": start_ts}}}
], "must_not": [
{"exists": {"field": "search_word"}}
]
}
}, "size": 0, "aggs": {
"NAME": {
"terms": {
"field": "platform",
"size": 10
}
}
}
}
}
search_res = es_framework.search(body=search_body,index=index)
search_res = es_framework.search(body=search_body, index=index)
platform_count = search_res["aggregations"]["NAME"]["buckets"]
return platform_count
def get_count_hot_crawler_data_from_mysql(start_ts:int) ->Dict:
data_platform_id_list = [11,12]
def get_count_hot_crawler_data_from_mysql(start_ts: int) -> Dict:
data_platform_id_list = [11, 12]
res_dict = {}
for platform_id in data_platform_id_list:
start_date = datetime.datetime.fromtimestamp(start_ts/1e3)
match_all_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and platform = {platfrom}""".format(time=str(start_date),platfrom=platform_id)
start_date = datetime.datetime.fromtimestamp(start_ts / 1e3)
match_all_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and platform = {platfrom}""".format(
time=str(start_date), platfrom=platform_id)
cur_mimas.execute(match_all_sql)
data = cur_mimas.fetchall()
match_all_num = data[0][0]
# print(match_all_num)
oneline_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and is_online = 1 and platform = {platfrom}""".format(time=str(start_date),platfrom=platform_id)
oneline_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and is_online = 1 and platform = {platfrom}""".format(
time=str(start_date), platfrom=platform_id)
cur_mimas.execute(oneline_sql)
data = cur_mimas.fetchall()
match_oneline_num = data[0][0]
......@@ -271,6 +338,7 @@ def get_count_hot_crawler_data_from_mysql(start_ts:int) ->Dict:
res_dict[platform].append(match_oneline_num)
return res_dict
def week_num(year=None, cycle=None, cycle_num=None, compare_type=None):
now = datetime.datetime.now()
now_canlendar = now.isocalendar()
......@@ -300,23 +368,66 @@ def week_num(year=None, cycle=None, cycle_num=None, compare_type=None):
this_week_index = 'crawler-data-raw'
return this_week_index, fisrt_day_ts, last_day_ts, this_week, last_week, last_year
def get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year):
res_data = [("周","抓取平台","入库内容数","推送候选内容入库数","上线内容数","推送内容数")]
res_data = [("周", "抓取平台", "入库内容数", "推送候选内容入库数", "上线内容数", "推送内容数")]
craw_data_count = get_hot_crawler_data_count(data_index, start_ts)
res_dict = get_count_hot_crawler_data_from_mysql(start_ts)
for platform in ("douban","weibo"):
for platform in ("douban", "weibo"):
craw_one_week_count = ""
for buck in craw_data_count:
if buck["key"] == platform:
craw_one_week_count = buck["doc_count"]
push_num = res_dict[platform][0]
oneline_num = res_dict[platform][1]
new_line = (week_num,platform,craw_one_week_count,push_num,oneline_num,"")
new_line = (week_num, platform, craw_one_week_count, push_num, oneline_num, "")
res_data.append(new_line)
return res_data
def get_keyword_ctr(hive,start_ts,end_ts):
start_date = datetime.datetime.fromtimestamp(start_ts/1e3)
end_date = datetime.datetime.fromtimestamp(end_ts/1e3)
start_date_str = start_date.strftime("%Y%m%d")
end_date_str = end_date.strftime("%Y%m%d")
data_dic = {}
# --query词曝光
baoguang_sql = """
SELECT card_id as query,count(*) as query_count FROM online.ml_community_precise_exposure_detail
WHERE partition_date>='{start_date}' AND partition_date<'{end_date}' and page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') group by query
""".format(start_date=str(start_date_str),end_date=str(end_date_str))
device_df = spark.sql(baoguang_sql)
device_df.show(1, False)
sql_res = device_df.collect()
for res in sql_res:
print(res)
print(res.query)
# hive.cursor.execute(baoguang_sql)
# for data in hive.cursor.fetchall():
# data_dic[data[0]] = data[1]
# --query词曝光
query_sql = """
SELECT params['query_words']
FROM bl_hdfs_maidian_updates
WHERE partition_date>='{start_date}' AND partition_date<'{end_date}'
AND action = 'report_status'
AND page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor')
""".format(start_date=start_date_str,end_date=end_date_str)
device_df = spark.sql(baoguang_sql)
device_df.show(1, False)
device_df.collect()
# for single_keyword in device_df[0]:
# data_count = data_dic.get(single_keyword)
# if data_count:
# data_dic[single_keyword] = data_dic[single_keyword] + 1
# else:
# data_dic[single_keyword] = 0
# return data_dic
def craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year):
tag_names_list_week = []
one_month_ago = datetime.datetime.now().date() - datetime.timedelta(days=30)
search_keyword_dict = get_search_keywrod_dict()
......@@ -329,17 +440,18 @@ def craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, y
data = zhengxing_cursor.fetchall()
tup_title = ("关键词", "近一周搜索内容ctr", "过去30天搜索pv", "过去30天搜索uv", "帖子-3星内容数量", "帖子-4星内容数量",
"帖子-5星内容数量","帖子-6星内容数量","近一周抓取数量","近一周审核上线3星数量",
"近一周审核上线4星数量","近一周审核上线5星数量","近一周审核上线6星数量")
"帖子-5星内容数量", "帖子-6星内容数量", "近一周抓取数量", "近一周审核上线3星数量",
"近一周审核上线4星数量", "近一周审核上线5星数量", "近一周审核上线6星数量")
tag_names_list_week.append(tup_title)
for name in list(data):
word = name.get("keywords", None)
if word in search_keyword_dict:
sorteds = name.get("nums", 0)
uv = name.get("uvs", 0)
tractate_content_num = get_es_word(word,start_ts)
new_line = [word,"" ,sorteds,uv]
tag_names_list_week.append(tuple(new_line+tractate_content_num))
tractate_content_num = get_es_word(word, start_ts)
new_line = [word, "", sorteds, uv]
tag_names_list_week.append(tuple(new_line + tractate_content_num))
for word in search_keyword_dict:
tractate_content_num = get_es_word(word, start_ts)
if tractate_content_num[0] != 0:
......@@ -347,17 +459,19 @@ def craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, y
tag_names_list_week.append(tuple(new_line + tractate_content_num))
return tag_names_list_week
if __name__ == "__main__":
data_index, start_ts, end_ts, week_num, last_week_num, year = week_num()
# 一周爬虫抓取热点数
craw_one_week = get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year)
print(craw_one_week)
# query 一周抓取详情
all_data_week = craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year )
path = "近1周数据统计结果.xls"
exl = WritrExcel()
exl.write_excel("热点内容抓取周报", tuple(craw_one_week))
exl.write_excel("query抓取周报", tuple(all_data_week))
exl.save_excel(path)
print(u'创建demo.xls文件成功')
send_email_tome()
# craw_one_week = get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year)
# print(craw_one_week)
# # query 一周抓取详情
# all_data_week = craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year)
# path = "近1周数据统计结果.xls"
# exl = WritrExcel()
# exl.write_excel("热点内容抓取周报", tuple(craw_one_week))
# exl.write_excel("query抓取周报", tuple(all_data_week))
# exl.save_excel(path)
# print(u'创建demo.xls文件成功')
# send_email_tome()
word_count = get_keyword_ctr(hive, start_ts, end_ts)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment