1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# -*- coding:UTF-8 -*-
# @Time : 2020/8/11 14:33
# @File : crawler_week_report.py
# @email : litao@igengmei.com
# @author : litao
# coding=utf-8
import json
import pymysql
import xlwt, datetime
import redis
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit
import pytispark.pytispark as pti
startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.jerry.jdbcuri", "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName("LR PYSPARK TEST").enableHiveSupport().getOrCreate())
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
conn_mimas = pymysql.connect(host='172.16.30.138', port=3306, user='mimas', passwd='GJL3UJe1Ck9ggL6aKnZCq4cRvM',
db='mimas_prod', charset='utf8mb4')
cur_mimas = conn_mimas.cursor()
db_zhengxing_eagle = pymysql.connect(host="172.16.30.136", port=3306, user="doris",
password="o5gbA27hXHHm",
db="doris_prod",
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
zhengxing_cursor = db_zhengxing_eagle.cursor()
es = Elasticsearch([
{
'host': '172.16.31.17',
'port': 9200,
}, {
'host': '172.16.31.11',
'port': 9200,
}])
# class Hive_conn():
# def __init__(self):
# self._hive_host = "bj-gm-prod-cos-datacenter003"
# self._hive_port = 10000
# self._hive_username = "strategy"
# self._hive_database = "online"
# self._conn = hive.Connection(host=self._hive_host, port=self._hive_port, username=self._hive_username,
# database=self._hive_database)
# self.cursor = self._conn.cursor()
# # self.cursor.execute('show tables')
#
# def __del__(self):
# self.cursor.close()
# self._conn.close()
# def zipDir(dirpath,outFullName):
# """
# 压缩指定文件夹
# :param dirpath: 目标文件夹路径
# :param outFullName: 压缩文件保存路径+xxxx.zip
# :return: 无
# """
# import zipfile
# zip = zipfile.ZipFile(outFullName,"w",zipfile.ZIP_DEFLATED)
# for path,dirnames,filenames in os.walk(dirpath):
# # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
# fpath = path.replace(dirpath,'')
#
# for filename in filenames:
# zip.write(os.path.join(path,filename),os.path.join(fpath,filename))
# zip.close()
def send_email_tome():
try:
date = datetime.datetime.now().date() - datetime.timedelta(days=1)
fromaddr = 'litao@igengmei.com'
content = 'hi all:附件为' + str(date) + '的搜索词数据统计结果以及近一周的数据统计结果,请查收!'
zipFile = "/srv/apps/crawler/近1周数据统计结果.xls"
send_file_email("", "", email_group=["<litao@igengmei.com>","<duanyingrong@igengmei.com>"], title_str=content
, email_msg_body_str=content, file=zipFile)
except Exception as e:
print(e)
def get_es_word(word, start_ts):
# tractate oneline
results = es.search(
index='gm-dbmw-tractate-read',
doc_type='tractate',
timeout='10s',
size=0,
body={
"query": {
"bool": {
"minimum_should_match": 1,
"should": [{"match_phrase": {"content": {"query": word, "analyzer": "gm_default_index"}}}, {
"match_phrase": {"tractate_tag_name": {"query": word, "analyzer": "gm_default_index"}}}, {
"match_phrase": {"tractate_tag_name_content": {"query": word,
"analyzer": "gm_default_index"}}}],
"must": [{"term": {"is_online": True}}, {
"terms":
{"content_level": [6, 5, 4, 3]}
}]
}
}, "size": 0, "aggs": {
"content_level": {
"terms": {
"field": "content_level",
"size": 10
}
}
}
}
)
tractate_content_num = results["hits"]["total"]
content_level_count = results["aggregations"]["content_level"]["buckets"]
# tractate craw
results = es.search(
index='gm-dbmw-tractate-read',
doc_type='tractate',
timeout='10s',
size=0,
body={
"query": {
"bool": {
"minimum_should_match": 1,
"should": [{"match_phrase": {"content": {"query": word, "analyzer": "gm_default_index"}}}, {
"match_phrase": {"tractate_tag_name": {"query": word, "analyzer": "gm_default_index"}}}, {
"match_phrase": {"tractate_tag_name_content": {"query": word,
"analyzer": "gm_default_index"}}}],
"must": [
{"terms": {"platform": [13, 14, 15, 16, 17]}},
{"range": {"create_time_epoch": {"gte": start_ts / 1e3}}}
]
}
}, "size": 0, "aggs": {
"content_level": {
"terms": {
"field": "content_level",
"size": 10
},
"aggs": {
"is_online": {
"terms": {
"field": "is_online",
"size": 10
}
}
}
}
}
}
)
craw_tractate_num = results["hits"]["total"]
craw_level_count = results["aggregations"]["content_level"]["buckets"]
res_list = []
# 可见3456星帖子
try:
for content_level in ("3", "4", "5", "6"):
doc_count = 0
for content_level_dic in content_level_count:
if content_level_dic["key"] == content_level:
doc_count = content_level_dic["doc_count"]
break
res_list.append(doc_count)
except:
print("craw_level_count error", craw_level_count)
res_list.append(craw_tractate_num)
for content_level in ("3", "4", "5", "6"):
doc_count = 0
for content_level_dic in craw_level_count:
if content_level_dic["key"] == content_level:
for buck in content_level_dic["is_online"]["buckets"]:
if buck["key"] == 1:
doc_count = buck["doc_count"]
break
res_list.append(doc_count)
return res_list
class WritrExcel():
def __init__(self):
self.workbook = xlwt.Workbook(encoding='utf-8')
def set_style(self, name, height, bold=False):
style = xlwt.XFStyle() # 初始化样式
font = xlwt.Font() # 为样式创建字体
font.name = name
font.bold = bold
font.color_index = 4
font.height = height
style.font = font
return style
# 写入Excel
def write_excel(self, sheet_name, rows):
# 创建工作簿
# 创建sheet
data_sheet = self.workbook.add_sheet(sheet_name)
# 将样式定义在循环之外
default = self.set_style('Times New Roman', 220, True)
j = k = 0
# 循环读取每一行数据并写入Excel
for row in rows[:65530]:
for i in range(len(row)):
try:
# 写入
data_sheet.write((j + k), i, row[i], default)
except:
print(i)
raise
# data_sheet.write(1, i, row1[i], self.set_style('Times New Roman', 220, True))
k = k + 1
print("写入sheet成功,共" + str(k) + "行数据")
def save_excel(self, path):
self.workbook.save(path)
def get_search_keywrod_dict():
es_framework = Elasticsearch(hosts='172.16.32.37', port=9200)
search_body = {
"query": {
"bool": {
"filter": [
]
}
}
}
return_dict = {}
scan_res = scan(es_framework, query=search_body, index="search_word")
for res in scan_res:
return_dict[res["_source"]["title"]] = {}
return return_dict
def get_hot_crawler_data_count(index, start_ts):
es_framework = Elasticsearch(hosts='172.16.32.37', port=9200)
search_body = {
"query": {
"bool": {
"filter": [
{"range": {"fetch_time": {"gte": int(start_ts)}}}
], "must_not": [
{"exists": {"field": "search_word"}}
]
}
}, "size": 0, "aggs": {
"NAME": {
"terms": {
"field": "platform",
"size": 10
}
}
}
}
print(search_body)
search_res = es_framework.search(body=search_body, index=index)
platform_count = search_res["aggregations"]["NAME"]["buckets"]
return platform_count
def get_count_hot_crawler_data_from_mysql(start_ts: int) -> Dict:
data_platform_id_list = [11, 12]
res_dict = {}
for platform_id in data_platform_id_list:
start_date = datetime.datetime.fromtimestamp(start_ts / 1e3)
start_date_str = start_date.strftime("%Y-%m-%d %H:%M:%S")
match_all_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and platform = {platfrom}""".format(
time=start_date_str, platfrom=platform_id)
cur_mimas.execute(match_all_sql)
data = cur_mimas.fetchall()
match_all_num = data[0][0]
# print(match_all_num)
oneline_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and is_online = 1 and pgc_type = 1""".format(
time=start_date_str)
cur_mimas.execute(oneline_sql)
data = cur_mimas.fetchall()
match_oneline_num = data[0][0]
# print(match_oneline_num)
if platform_id == 11:
platform = 'weibo'
elif platform_id == 12:
platform = "douban"
res_dict[platform] = []
res_dict[platform].append(match_all_num)
res_dict[platform].append(match_oneline_num)
return res_dict
def week_num(year=None, cycle=None, cycle_num=None, compare_type=None):
now = datetime.datetime.now()
now_canlendar = now.isocalendar()
if not cycle_num:
week_canlendar = now_canlendar
else:
week_canlendar = (now.year, cycle_num + 1, 0)
year = week_canlendar[0]
this_week = week_canlendar[1]
if this_week == 0:
last_year = year - 1
this_week = 1
else:
last_year = year
if this_week == 1:
last_week = "52"
else:
last_week = this_week - 1
today = datetime.datetime(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day)
# today = datetime.datetime(year=2018, month=12, day=25)
first_day_in_week = today - datetime.timedelta(
days=now_canlendar[2] + 7 * (now_canlendar[1] - week_canlendar[1]) + 2)
fisrt_day_ts = int(first_day_in_week.timestamp() * 1e3)
last_day_in_week = first_day_in_week + datetime.timedelta(days=7)
last_day_ts = int(last_day_in_week.timestamp() * 1e3)
this_week_index = 'crawler-data-raw'
return this_week_index, fisrt_day_ts, last_day_ts, this_week, last_week, last_year
def get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year):
res_data = [("周", "抓取平台", "入库内容数", "推送候选内容入库数", "上线内容数", "推送内容数")]
craw_data_count = get_hot_crawler_data_count(data_index, start_ts)
res_dict = get_count_hot_crawler_data_from_mysql(start_ts)
for platform in ("douban", "weibo"):
craw_one_week_count = ""
for buck in craw_data_count:
if buck["key"] == platform:
craw_one_week_count = buck["doc_count"]
push_num = res_dict[platform][0]
oneline_num = res_dict[platform][1]
new_line = (week_num, platform, craw_one_week_count, push_num, oneline_num, "")
res_data.append(new_line)
return res_data
def get_keyword_ctr(start_ts,end_ts):
start_date = datetime.datetime.fromtimestamp(start_ts/1e3)
end_date = datetime.datetime.fromtimestamp(end_ts/1e3)
start_date_str = start_date.strftime("%Y%m%d")
end_date_str = end_date.strftime("%Y%m%d")
data_dic = {}
# --query词曝光
baoguang_sql = """
SELECT card_id as query,count(*) as query_count FROM online.ml_community_precise_exposure_detail
WHERE partition_date>='{start_date}' AND partition_date<'{end_date}' and page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') group by query
""".format(start_date=str(start_date_str),end_date=str(end_date_str))
device_df = spark.sql(baoguang_sql)
device_df.show(1, False)
sql_res = device_df.collect()
print("-----------------------------------------------------------------------------")
for res in sql_res:
data_dic[res.query] = res.query_count
# --query词曝光
query_sql = """
SELECT params['query_words'] as query_words
FROM online.bl_hdfs_maidian_updates
WHERE partition_date>='{start_date}' AND partition_date<'{end_date}'
AND action = 'report_status'
AND page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor')
""".format(start_date=start_date_str,end_date=end_date_str)
device_df = spark.sql(query_sql)
device_df.show(1, False)
sql_res = device_df.collect()
for res in sql_res:
try:
res_json = json.loads(res.query_words)
# print(res_json, type(res_json))
except:
continue
for single_keyword in res_json:
data_count = data_dic.get(single_keyword)
if data_count:
data_dic[single_keyword] = data_dic[single_keyword] + 1
else:
data_dic[single_keyword] = 1
print(data_dic)
return data_dic
def craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year,word_count):
tag_names_list_week = []
one_month_ago = datetime.datetime.now().date() - datetime.timedelta(days=30)
search_keyword_dict = get_search_keywrod_dict()
sql = 'select keywords,sum(sorted) as nums,sum(uv) as uvs from api_search_words where is_delete = 0 and create_time >= "' + str(
one_month_ago) + '" group by keywords order by nums desc'
print(sql)
zhengxing_cursor.execute("set names 'UTF8'")
zhengxing_cursor.execute(sql)
data = zhengxing_cursor.fetchall()
tup_title = ("关键词", "近一周搜索内容ctr", "过去30天搜索pv", "过去30天搜索uv", "帖子-3星内容数量", "帖子-4星内容数量",
"帖子-5星内容数量", "帖子-6星内容数量", "近一周抓取数量", "近一周审核上线3星数量",
"近一周审核上线4星数量", "近一周审核上线5星数量", "近一周审核上线6星数量")
tag_names_list_week.append(tup_title)
for name in list(data):
word = name.get("keywords", None)
if word in search_keyword_dict:
sorteds = name.get("nums", 0)
uv = name.get("uvs", 0)
tractate_content_num = get_es_word(word, start_ts)
exposure_count = word_count.get(word)
if not exposure_count:
ctr = ""
else:
ctr = str(round(sorteds/exposure_count * 100, 2)) + "%"
new_line = [word,ctr, sorteds, uv]
tag_names_list_week.append(tuple(new_line + tractate_content_num))
for word in search_keyword_dict:
tractate_content_num = get_es_word(word, start_ts)
if tractate_content_num[0] != 0:
new_line = [word, 0, 0, 0]
tag_names_list_week.append(tuple(new_line + tractate_content_num))
return tag_names_list_week
if __name__ == "__main__":
data_index, start_ts, end_ts, week_num, last_week_num, year = week_num()
start_ts = 1597593600000
craw_one_week = get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year)
word_count = get_keyword_ctr(start_ts, end_ts)
# 一周爬虫抓取热点数
print(craw_one_week)
# query 一周抓取详情
all_data_week = craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year,word_count)
path = "近1周数据统计结果.xls"
exl = WritrExcel()
exl.write_excel("热点内容抓取周报", tuple(craw_one_week))
exl.write_excel("query抓取周报", tuple(all_data_week))
exl.save_excel(path)
print(u'创建demo.xls文件成功')
send_email_tome()