# -*- coding:UTF-8 -*- # @Time : 2020/8/11 9:54 # @File : push_crawler_data_to_mysql.py # @email : litao@igengmei.com # @author : litao import redis from maintenance.func_send_email_with_file import send_file_email from typing import Dict, List from elasticsearch import Elasticsearch from elasticsearch.helpers import scan from crawler.crawler_sys.utils.trans_qiniu_img import write_data_into_mysql es_framework = Elasticsearch(hosts='172.16.32.37', port=9200) rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12') # rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12') user_id_list = [33745191, 33745202, 33745231, 33745286, 33745295, 33745266, 33745315, 33745333, 33745346, 33745353, 33745327, 33745340, 33745355, 33745359, 33745364, 33745371, 33745395, 33745421, 33745433, 33745457] def send_email(query_id_dict: Dict): try: if query_id_dict: for search_keyword in query_id_dict: title_str = "关键词%s帖子内容审核" % search_keyword body_str = """ 问好: 新的query:{search_keyword} 抓取内容需要审核,帖子号为\n """.format(search_keyword=search_keyword, ) for tractate_id in query_id_dict[search_keyword]: body_str += str(tractate_id) + ", " print("line25", str(tractate_id)) send_file_email("", "", email_group=["‎", "‎", "‎", "‎"], cc_group=["‎", "‎"], email_msg_body_str=body_str, title_str=title_str) print("send to mysql") except Exception as e: print("send email error %s" % e) def scan_es_to_mysql(): query_id_dict = {} doc_id_list = [] search_query = { "query": { "bool": { "filter": [ ], "must": [ {"exists": {"field": "search_word"}} ] } } } count = 0 scan_res = scan(client=es_framework, query=search_query, index="crawler-data-raw") for res in scan_res: if_exists = rds.sismember("article_id_list", res["_id"]) tractate_id = None if not if_exists: doc_id_list.append(res["_id"]) for doc_id in doc_id_list: try: data = es_framework.get_source(index="crawler-data-raw",id=doc_id) except Exception as e: print("can't fine %s from es" % doc_id) continue if data: data["doc_id"] = doc_id try: tractate_id = write_data_into_mysql(data, user_id_list) print("write data %s %s into sql" % (tractate_id, res["_id"])) except Exception as e: print("send to mysql error %s" % e) # if tractate_id: # rds.sadd("article_id_list", res["_id"]) # search_word = data["search_word"] # if not query_id_dict.get(search_word): # query_id_dict[search_word] = {} # query_id_dict[search_word][tractate_id] = 1 # count += 1 # if count % 200 == 0: # send_email(query_id_dict) # query_id_dict = {} # send_email(query_id_dict) def send_one_data_to_mysql(_id): query_id_dict = {} search_query = { "query": { "bool": { "filter": [ {"term": {"_id": _id}} ] } } } search_res = es_framework.search(body=search_query, index="crawler-data-raw") if search_res["hits"]["hits"]: res = search_res["hits"]["hits"][0] if_exists = rds.sismember("article_id_list", res["_id"]) if_exists = None tractate_id = None if not if_exists: data = res["_source"] data["doc_id"] = res["_id"] try: tractate_id = write_data_into_mysql(data, user_id_list) print("write data %s %s into sql"% (tractate_id,res["_id"])) except Exception as e: print("send to mysql error %s" % e) if tractate_id: rds.sadd("article_id_list", res["_id"]) search_word = data["search_word"] if not query_id_dict.get(search_word): query_id_dict[search_word] = {} query_id_dict[search_word][tractate_id] = 1 send_email(query_id_dict) if __name__ == "__main__": scan_es_to_mysql() # send_one_data_to_mysql("zhihu_283857656_480262861") # send_one_data_to_mysql("zhihu_65123027_648018097")