1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# -*- coding:UTF-8 -*-
# @Time : 2020/8/11 9:54
# @File : push_crawler_data_to_mysql.py
# @email : litao@igengmei.com
# @author : litao
import redis
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from crawler.crawler_sys.utils.trans_qiniu_img import write_data_into_mysql
es_framework = Elasticsearch(hosts='172.16.32.37', port=9200)
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
# rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
user_id_list = [33745191, 33745202, 33745231, 33745286, 33745295, 33745266, 33745315, 33745333, 33745346, 33745353,
33745327,
33745340, 33745355, 33745359, 33745364, 33745371, 33745395, 33745421, 33745433, 33745457]
def send_email(query_id_dict: Dict):
try:
if query_id_dict:
for search_keyword in query_id_dict:
title_str = "关键词%s帖子内容审核" % search_keyword
body_str = """
问好:
新的query:{search_keyword} 抓取内容需要审核,帖子号为\n
""".format(search_keyword=search_keyword, )
for tractate_id in query_id_dict[search_keyword]:
body_str += str(tractate_id) + ", "
print("line25", str(tractate_id))
send_file_email("", "",
email_group=["<hongxu@igengmei.com>", "<yangjiayue@igengmei.com>",
"<zhangweiwei@igengmei.com>", "<liuyiting@igengmei.com>"],
cc_group=["<duanyingrong@igengmei.com>", "<litao@igengmei.com>"],
email_msg_body_str=body_str, title_str=title_str)
print("send to mysql")
except Exception as e:
print("send email error %s" % e)
def scan_es_to_mysql():
query_id_dict = {}
doc_id_list = []
search_query = {
"query": {
"bool": {
"filter": [
], "must": [
{"exists": {"field": "search_word"}}
]
}
}
}
count = 0
scan_res = scan(client=es_framework, query=search_query, index="crawler-data-raw")
for res in scan_res:
if_exists = rds.sismember("article_id_list", res["_id"])
tractate_id = None
if not if_exists:
doc_id_list.append(res["_id"])
for doc_id in doc_id_list:
try:
data = es_framework.get_source(index="crawler-data-raw",id=doc_id)
except Exception as e:
print("can't fine %s from es" % doc_id)
continue
if data:
data["doc_id"] = doc_id
try:
tractate_id = write_data_into_mysql(data, user_id_list)
print("write data %s %s into sql" % (tractate_id, res["_id"]))
except Exception as e:
print("send to mysql error %s" % e)
# if tractate_id:
# rds.sadd("article_id_list", res["_id"])
# search_word = data["search_word"]
# if not query_id_dict.get(search_word):
# query_id_dict[search_word] = {}
# query_id_dict[search_word][tractate_id] = 1
# count += 1
# if count % 200 == 0:
# send_email(query_id_dict)
# query_id_dict = {}
# send_email(query_id_dict)
def send_one_data_to_mysql(_id):
query_id_dict = {}
search_query = {
"query": {
"bool": {
"filter": [
{"term": {"_id": _id}}
]
}
}
}
search_res = es_framework.search(body=search_query, index="crawler-data-raw")
if search_res["hits"]["hits"]:
res = search_res["hits"]["hits"][0]
if_exists = rds.sismember("article_id_list", res["_id"])
if_exists = None
tractate_id = None
if not if_exists:
data = res["_source"]
data["doc_id"] = res["_id"]
try:
tractate_id = write_data_into_mysql(data, user_id_list)
print("write data %s %s into sql"% (tractate_id,res["_id"]))
except Exception as e:
print("send to mysql error %s" % e)
if tractate_id:
rds.sadd("article_id_list", res["_id"])
search_word = data["search_word"]
if not query_id_dict.get(search_word):
query_id_dict[search_word] = {}
query_id_dict[search_word][tractate_id] = 1
send_email(query_id_dict)
if __name__ == "__main__":
scan_es_to_mysql()
# send_one_data_to_mysql("zhihu_283857656_480262861")
# send_one_data_to_mysql("zhihu_65123027_648018097")