Commit 4cbd0143 authored by litaolemo's avatar litaolemo

增加query词抓取数据入库

parent eb68175b
......@@ -14,8 +14,9 @@ es_framework = Elasticsearch(hosts='172.16.32.37', port=9200)
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
# rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
user_id_list = [33745191,33745202,33745231,33745286,33745295,33745266,33745315,33745333,33745346,33745353,33745327,
33745340,33745355,33745359,33745364,33745371,33745395,33745421,33745433,33745457]
user_id_list = [33745191, 33745202, 33745231, 33745286, 33745295, 33745266, 33745315, 33745333, 33745346, 33745353,
33745327,
33745340, 33745355, 33745359, 33745364, 33745371, 33745395, 33745421, 33745433, 33745457]
def send_email(query_id_dict: Dict):
......@@ -26,13 +27,13 @@ def send_email(query_id_dict: Dict):
body_str = """
问好:
新的query:{search_keyword}抓取内容需要审核,帖子号为\n
""".format(search_keyword=search_keyword,)
""".format(search_keyword=search_keyword, )
for tractate_id in query_id_dict[search_keyword]:
body_str += tractate_id + ", "
print("line25", tractate_id)
send_file_email("", "",
email_group=["‎<hongxu@igengmei@igengmei.com>", "‎<yangjiayue@igengmei.com>",
email_group=["‎<hongxu@igengmei.com>", "‎<yangjiayue@igengmei.com>",
"‎<zhangweiwei@igengmei.com>", "‎<liuyiting@igengmei.com>"],
cc_group=["‎<duanyingrong@igengmei.com>", "‎<litao@igengmei.com>"],
email_msg_body_str=body_str, title_str=title_str)
......@@ -45,23 +46,25 @@ def scan_es_to_mysql():
query_id_dict = {}
search_query = {
"query": {
"bool": {
"filter": [
], "must": [
{"exists": {"field": "search_word"}}
]
}
"bool": {
"filter": [
], "must": [
{"exists": {"field": "search_word"}}
]
}
}
}
scan_res = scan(client=es_framework,query=search_query,index="crawler-data-raw")
scan_res = scan(client=es_framework, query=search_query, index="crawler-data-raw")
for res in scan_res:
if_exists = rds.sismember("article_id_list",res["_id"])
if_exists = rds.sismember("article_id_list", res["_id"])
tractate_id = None
if not if_exists:
data = res["_source"]
data["doc_id"] = res["_id"]
try:
tractate_id = write_data_into_mysql(data,user_id_list)
tractate_id = write_data_into_mysql(data, user_id_list)
print("write data %s %s into sql" % (tractate_id, res["_id"]))
rds.sadd("article_id_list", res["_id"])
except Exception as e:
print("send to mysql error %s" % e)
if tractate_id:
......@@ -70,8 +73,38 @@ def scan_es_to_mysql():
send_email(query_id_dict)
if __name__ == "__main__":
scan_es_to_mysql()
def send_one_data_to_mysql(_id):
query_id_dict = {}
search_query = {
"query": {
"bool": {
"filter": [
{"term": {"_id": _id}}
]
}
}
}
search_res = es_framework.search(body=search_query, index="crawler-data-raw")
if search_res["hits"]["hits"]:
res = search_res["hits"]["hits"][0]
if_exists = rds.sismember("article_id_list", res["_id"])
tractate_id = None
if not if_exists:
data = res["_source"]
data["doc_id"] = res["_id"]
try:
tractate_id = write_data_into_mysql(data, user_id_list)
print("write data %s %s into sql"% (tractate_id,res["_id"]))
rds.sadd("article_id_list", res["_id"])
except Exception as e:
print("send to mysql error %s" % e)
if tractate_id:
search_word = data["search_word"]
query_id_dict[search_word][tractate_id] = 1
send_email(query_id_dict)
if __name__ == "__main__":
# scan_es_to_mysql()
send_one_data_to_mysql("zhihu_283857656_480262861")
......@@ -392,8 +392,14 @@ class Crawler_toutiao():
try:
title = one_line['title']
abstract = one_line['abstract']
url = one_line['article_url']
try:
abstract = one_line['abstract']
except:
abstract = ""
if one_line.get('article_url'):
url = one_line['article_url']
elif one_line.get('url'):
url = one_line['url']
play_count = one_line['read_count']
comment_count = one_line['comment_count']
favorite_count = one_line['digg_count']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment