Commit 42aa9e6d authored by litaolemo's avatar litaolemo

update redis 更换地址

parent c95a181f
...@@ -8,7 +8,7 @@ import redis, json ...@@ -8,7 +8,7 @@ import redis, json
from crawler_sys.framework.platform_redis_register import get_redis_list_name from crawler_sys.framework.platform_redis_register import get_redis_list_name
from crawler_sys.framework.es_crawler import scan_crawler_url_register from crawler_sys.framework.es_crawler import scan_crawler_url_register
rds = redis.StrictRedis(host='154.8.190.251', port=6379, db=19) rds = redis.StrictRedis(host='172.18.51.10', port=6379, db=19)
def feed_url_into_redis(dict_Lst, expire=0, def feed_url_into_redis(dict_Lst, expire=0,
......
...@@ -14,7 +14,7 @@ PARSER = argparse.ArgumentParser(description='video platform search page crawler ...@@ -14,7 +14,7 @@ PARSER = argparse.ArgumentParser(description='video platform search page crawler
# '/crawler_sys/framework/config' # '/crawler_sys/framework/config'
# '/search_keywords.ini'), # '/search_keywords.ini'),
# help=('config file absolute path')) # help=('config file absolute path'))
PARSER.add_argument('-p', '--platform', default=["toutiao","腾讯新闻", "腾讯视频", "new_tudou"], action='append', PARSER.add_argument('-p', '--platform', default=["toutiao", "腾讯新闻", "腾讯视频", "new_tudou"], action='append',
help=('legal platform name is required')) help=('legal platform name is required'))
PARSER.add_argument('-k', '--key_word_platform', default=[], action='append', PARSER.add_argument('-k', '--key_word_platform', default=[], action='append',
help=('key_word_legal platform name is required')) help=('key_word_legal platform name is required'))
...@@ -29,8 +29,8 @@ ARGS = PARSER.parse_args() ...@@ -29,8 +29,8 @@ ARGS = PARSER.parse_args()
es_framework = Elasticsearch(hosts='192.168.17.11', port=80, es_framework = Elasticsearch(hosts='192.168.17.11', port=80,
http_auth=('crawler', 'XBcasfo8dgfs')) http_auth=('crawler', 'XBcasfo8dgfs'))
index_target_releaser = 'search_keywords' # index_target_releaser = 'search_keywords'
doc_type_target_releaser = 'doc' # doc_type_target_releaser = 'doc'
# index_target_releaser = 'test2' # index_target_releaser = 'test2'
# doc_type_target_releaser = 'keywrod' # doc_type_target_releaser = 'keywrod'
...@@ -52,31 +52,82 @@ OUTPUT_TO_ES_REGISTER = ARGS.output_to_es_register ...@@ -52,31 +52,82 @@ OUTPUT_TO_ES_REGISTER = ARGS.output_to_es_register
def func_search_keywordlist(platform): def func_search_keywordlist(platform):
search_body = {"query": {"bool": {"filter": []}}} res_dic = {}
search_resp = es_framework.search(index=index_target_releaser, res_list = ["比基尼线脱毛",
doc_type=doc_type_target_releaser, "嗨体泪沟",
body=search_body, "根据脸型选发型",
size=0, "圆脸适合什么发型",
request_timeout=100) "5热玛吉",
total_hit = search_resp['hits']['total'] "耳软骨假体鼻综合",
releaser_dic = {} "肉毒素去法令纹",
if total_hit > 0: "吸脂瘦腹部",
print('Got %d releaser for platform %s.' % (total_hit, platform)) "嗨体填充泪沟",
scan_resp = scan(client=es_framework, query=search_body, "6d小脸针",
index=index_target_releaser, "水剥离",
doc_type=doc_type_target_releaser, "嗨体去颈纹",
request_timeout=200) "胶原蛋白填充泪沟",
for line in scan_resp: "吸脂瘦全身",
try: "肉毒素去狐臭",
title = line['_source']['title'] "吸脂瘦腰部",
page = line['_source']['page'] "fotona4d",
releaser_dic[title] = page "嘴综合",
except: "胸部下垂矫正",
print('error in :', line) "5g天使光雕",
continue "唇综合",
else: "SVF-gel脂肪胶",
print('Got zero hits.') "嘴角上扬术",
return releaser_dic "嗨体注射",
"脂肪填充修复",
"比基尼脱毛",
"lams吸脂",
"脂肪填充面部年轻化",
"嗨体",
"吸脂祛副乳",
"m22",
"胸部提升",
"fotona",
"O型腿矫正",
"肋骨鼻",
"欣颜",
"唯颜",
"垫眉骨",
"咬肌切除",
"背部吸脂",
"m22王者之冠",
"bbl",
"胶原蛋白填充祛黑眼圈",
]
for l in res_list:
res_dic[l] = 10
return res_dic
# def func_search_keywordlist(platform):
# search_body = {"query": {"bool": {"filter": []}}}
# search_resp = es_framework.search(index=index_target_releaser,
# doc_type=doc_type_target_releaser,
# body=search_body,
# size=0,
# request_timeout=100)
# total_hit = search_resp['hits']['total']
# releaser_dic = {}
# if total_hit > 0:
# print('Got %d releaser for platform %s.' % (total_hit, platform))
# scan_resp = scan(client=es_framework, query=search_body,
# index=index_target_releaser,
# doc_type=doc_type_target_releaser,
# request_timeout=200)
# for line in scan_resp:
# try:
# title = line['_source']['title']
# page = line['_source']['page']
# releaser_dic[title] = page
# except:
# print('error in :', line)
# continue
# else:
# print('Got zero hits.')
# return releaser_dic
if OUTPUT_TO_ES_RAW is True: if OUTPUT_TO_ES_RAW is True:
...@@ -99,20 +150,13 @@ def search_page_task(platform, output_to_es_raw, ...@@ -99,20 +150,13 @@ def search_page_task(platform, output_to_es_raw,
print("search keyword '%s' on platform %s" % (keyword, platform)) print("search keyword '%s' on platform %s" % (keyword, platform))
search_pages = int(KEYWORD_dic[keyword]) search_pages = int(KEYWORD_dic[keyword])
try: try:
if platform != "腾讯新闻": crawler.search_page(keyword=keyword,
crawler.search_page(keyword=keyword,
search_pages_max=search_pages, search_pages_max=search_pages,
output_to_es_raw=output_to_es_raw, output_to_es_raw=output_to_es_raw,
output_to_es_register=output_to_es_register, output_to_es_register=output_to_es_register,
es_index=es_index, es_index=es_index,
doc_type=doc_type) doc_type=doc_type)
else:
crawler.search_video_page(keyword, None,
search_pages_max=search_pages,
output_to_es_raw=output_to_es_raw,
output_to_es_register=output_to_es_register,
es_index=es_index,
doc_type=doc_type,releaser=False)
except Exception as e: except Exception as e:
print(e) print(e)
continue continue
...@@ -120,15 +164,16 @@ def search_page_task(platform, output_to_es_raw, ...@@ -120,15 +164,16 @@ def search_page_task(platform, output_to_es_raw,
result = [] result = []
kwargs_dict = { kwargs_dict = {
'output_to_es_raw': OUTPUT_TO_ES_RAW, 'output_to_es_raw': OUTPUT_TO_ES_RAW,
'output_to_es_register': OUTPUT_TO_ES_REGISTER, 'output_to_es_register': OUTPUT_TO_ES_REGISTER,
'es_index': ES_INDEX, 'es_index': ES_INDEX,
'doc_type': DOC_TYPE, 'doc_type': DOC_TYPE,
} }
pool = Pool(processes=4) pool = Pool(processes=4)
for platform in PLATFORM_LIST: for platform in PLATFORM_LIST:
res = pool.apply_async(func=search_page_task, args=(platform,OUTPUT_TO_ES_RAW,OUTPUT_TO_ES_REGISTER,ES_INDEX,DOC_TYPE)) res = pool.apply_async(func=search_page_task,
args=(platform, OUTPUT_TO_ES_RAW, OUTPUT_TO_ES_REGISTER, ES_INDEX, DOC_TYPE))
result.append(res) result.append(res)
pool.close() pool.close()
pool.join() pool.join()
......
...@@ -44,7 +44,7 @@ from redis.sentinel import Sentinel ...@@ -44,7 +44,7 @@ from redis.sentinel import Sentinel
# # 连接数据库 # # 连接数据库
# rds_1 = sentinel.master_for('ida_redis_master', socket_timeout=1, db=1, decode_responses=True) # rds_1 = sentinel.master_for('ida_redis_master', socket_timeout=1, db=1, decode_responses=True)
rds_1 = redis.StrictRedis(host='154.8.190.251', port=6379, db=19, decode_responses=True) rds_1 = redis.StrictRedis(host='172.18.51.10', port=6379, db=19, decode_responses=True)
parser = argparse.ArgumentParser(description='Specify a platform name.') parser = argparse.ArgumentParser(description='Specify a platform name.')
parser.add_argument('-n', '--max_page', default=30, type=int, parser.add_argument('-n', '--max_page', default=30, type=int,
......
...@@ -34,7 +34,7 @@ from redis.sentinel import Sentinel ...@@ -34,7 +34,7 @@ from redis.sentinel import Sentinel
# 连接数据库 # 连接数据库
# rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=1, decode_responses=True) # rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=1, decode_responses=True)
rds = redis.StrictRedis(host='154.8.190.251', port=6379, db=19, decode_responses=True) rds = redis.StrictRedis(host='172.18.51.10', port=6379, db=19, decode_responses=True)
parser = argparse.ArgumentParser(description='Specify a platform name.') parser = argparse.ArgumentParser(description='Specify a platform name.')
parser.add_argument('-p', '--platform', default=[], action='append', parser.add_argument('-p', '--platform', default=[], action='append',
......
...@@ -23,7 +23,7 @@ import kdl, requests ...@@ -23,7 +23,7 @@ import kdl, requests
# slave = sentinel.discover_slaves('ida_redis_master') # slave = sentinel.discover_slaves('ida_redis_master')
# # 连接数据库 # # 连接数据库
# rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=7, decode_responses=True) # rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=7, decode_responses=True)
rds = redis.StrictRedis(host='154.8.190.251', port=6379, db=18, decode_responses=True) rds = redis.StrictRedis(host='172.18.51.10', port=6379, db=18, decode_responses=True)
def get_proxy_from_redis(): def get_proxy_from_redis():
......
...@@ -107,6 +107,21 @@ class Crawler_zhihu(): ...@@ -107,6 +107,21 @@ class Crawler_zhihu():
print(requests_res.cookies.get_dict()) print(requests_res.cookies.get_dict())
return requests_res.cookies.get_dict() return requests_res.cookies.get_dict()
def parse_sigle_page(self,aid,data_dict,article_type):
if article_type =="knowledge_ad":
pass
elif article_type == "zvideo":
pass
elif article_type == "search_result":
article_type == data_dict["object"]["type"]
url = data_dict["object"]["type"]
elif article_type == "search_club":
pass
elif article_type == "relevant_query":
pass
else:
pass
def search_article_page(self, keyword, search_pages_max=12, def search_article_page(self, keyword, search_pages_max=12,
output_to_es_raw=False, output_to_es_raw=False,
output_to_es_register=False, output_to_es_register=False,
......
...@@ -7,8 +7,11 @@ Created on Tue May 15 13:59:43 2018 ...@@ -7,8 +7,11 @@ Created on Tue May 15 13:59:43 2018
import json import json
import datetime import datetime
import random
import time import time
import re import re
from typing import Dict, List
import pymysql
import requests import requests
from elasticsearch.exceptions import TransportError from elasticsearch.exceptions import TransportError
from crawler_sys.framework.redis_interact import feed_url_into_redis from crawler_sys.framework.redis_interact import feed_url_into_redis
...@@ -20,12 +23,30 @@ from crawler_sys.framework.es_ccr_index_defination import fields_url_register ...@@ -20,12 +23,30 @@ from crawler_sys.framework.es_ccr_index_defination import fields_url_register
from write_data_into_es.func_cal_doc_id import cal_doc_id from write_data_into_es.func_cal_doc_id import cal_doc_id
from crawler_sys.utils.write_into_file import write_str_into_file from crawler_sys.utils.write_into_file import write_str_into_file
from crawler.crawler_sys.proxy_pool.func_get_proxy_form_kuaidaili import get_proxy from crawler.crawler_sys.proxy_pool.func_get_proxy_form_kuaidaili import get_proxy
from lxml import html
from lxml.html.clean import Cleaner
from crawler.gm_upload.gm_upload import upload, upload_file
index_site_crawler = 'crawler-data-raw' index_site_crawler = 'crawler-data-raw'
doc_type_site_crawler = 'doc' doc_type_site_crawler = 'doc'
# 实例化mysql连接对象
class mysql_conn():
def __init__(self, mysql_name):
if mysql_name == "test":
self.conn = pymysql.connect(host='bj-cdb-6slgqwlc.sql.tencentcdb.com', port=62120, user='work',
passwd='Gengmei1',
db='mimas_test', charset='utf8')
elif mysql_name == "mimas":
self.conn = pymysql.connect(host='172.16.30.138', port=3306, user='mimas',
passwd='GJL3UJe1Ck9ggL6aKnZCq4cRvM',
db='mimas_prod', charset='utf8mb4')
self.cur = self.conn.cursor()
def form_data_Lst_for_url_register(data_Lst_ori): def form_data_Lst_for_url_register(data_Lst_ori):
ts = int(datetime.datetime.now().timestamp()*1e3) ts = int(datetime.datetime.now().timestamp() * 1e3)
data_Lst_reg = [] data_Lst_reg = []
for line in data_Lst_ori: for line in data_Lst_ori:
try: try:
...@@ -41,42 +62,241 @@ def form_data_Lst_for_url_register(data_Lst_ori): ...@@ -41,42 +62,241 @@ def form_data_Lst_for_url_register(data_Lst_ori):
return data_Lst_reg return data_Lst_reg
def hot_words_output_result(result_Lst,output_index="short-video-hotwords"): def hot_words_output_result(result_Lst, output_index="short-video-hotwords"):
bulk_all_body = "" bulk_all_body = ""
for count,result in enumerate(result_Lst): for count, result in enumerate(result_Lst):
doc_id = result["platform"] + "_"+ result["title"] doc_id = result["platform"] + "_" + result["title"]
bulk_head = '{"index": {"_id":"%s"}}' % doc_id bulk_head = '{"index": {"_id":"%s"}}' % doc_id
data_str = json.dumps(result, ensure_ascii=False) data_str = json.dumps(result, ensure_ascii=False)
bulk_one_body = bulk_head + '\n' + data_str + '\n' bulk_one_body = bulk_head + '\n' + data_str + '\n'
bulk_all_body += bulk_one_body bulk_all_body += bulk_one_body
if count % 500 == 0 and count >0: if count % 500 == 0 and count > 0:
eror_dic = es_site_crawler.bulk(index=output_index, eror_dic = es_site_crawler.bulk(index=output_index,
body=bulk_all_body, request_timeout=200) body=bulk_all_body, request_timeout=200)
bulk_all_body = '' bulk_all_body = ''
if eror_dic['errors'] is True: if eror_dic['errors'] is True:
print(eror_dic['items']) print(eror_dic['items'])
print(bulk_all_body) print(bulk_all_body)
print(count) print(count)
if bulk_all_body != '': if bulk_all_body != '':
eror_dic = es_site_crawler.bulk(body=bulk_all_body, eror_dic = es_site_crawler.bulk(body=bulk_all_body,
index=output_index, index=output_index,
request_timeout=200) request_timeout=200)
if eror_dic['errors'] is True: if eror_dic['errors'] is True:
print(eror_dic) print(eror_dic)
WHITE_TAGS = {
"basic": ["div", "p", "span", "img", "br", "video", 'a'], # 暂定小程序及爬取数据使用
"all": [
"div", "p", "span", "img", "br", "video", "audio", "a", "b", "strong", "i", "ul", "ol", "li", "em", "h1",
"h2", "h3", "h4", "h5", "h6", "iframe",
] # 可以展示的所有白标签
}
img_type = {
"OTHER": 1,
# '其他图片'
"GIF": 2,
# "GIF动图")
"JPG": 3,
# "JPG图片")
"JPEG": 4,
# "JPEG图片")
"PNG": 5,
# "PNG图片")
"BMP": 6,
# "BMP位图")
"WEBP": 7,
# "WEBP图片类型")
"TIFF": 8,
# "TIFF图片类型")
}
def gm_convert_html_tags(rich_text, all_tags=False, remove_tags=None):
"""
富文本内容重新清洗,剔除不需要的样式
:param rich_text: 富文本
:param all_tags: 是否需要匹配所有白名单中的标签
:param remove_tags: 需要剔除的,白名单标签 []
:return:
"""
if not rich_text:
return ""
# rich_text = _get_rich_text(rich_text)
# 标签清洗 + 补齐 参数
tags = WHITE_TAGS["all"] if all_tags else WHITE_TAGS["basic"]
if remove_tags:
tags = [tag for tag in tags if tag not in remove_tags]
kw = {
"remove_unknown_tags": False,
"allow_tags": tags,
"safe_attrs": ["src", ],
}
if "a" in tags:
kw["safe_attrs"].append("href")
elif all_tags:
kw["safe_attrs"].extend(["class", "style"])
if "iframe" in kw["allow_tags"]:
kw["embedded"] = False
clear = Cleaner(**kw)
rich_text = clear.clean_html(rich_text)
# 增加样式
element_obj = html.fromstring(rich_text)
for element in element_obj.xpath(u"//img|//video"):
if not all_tags: # 小程序,普通用户,爬取数据
element.attrib["width"] = "100%" # 图片、视频增加宽度 100%
if element.tag == "video" and all_tags:
element.attrib["class"] = "js_richtext_video"
# 移除a标签中跳转链不是gengmei开头的链接
for item in element_obj.xpath("//a[not(starts-with(@href, 'gengmei://'))]"):
item.getparent().remove(item)
# a 标签追加样式
for item in element_obj.xpath("//a"):
item.attrib["style"] = 'color:#3FB5AF' # a标签颜色
rich_text = html.tostring(element_obj, encoding="unicode")
return rich_text
def push_data_to_user(res_data: Dict) -> Dict:
"""
处理数据为可以入库的格式
:param res_data:
:return:
"""
qiniu_img_list = []
if res_data["img_list"]:
for img_url in res_data["img_list"]:
try:
img_wb = retry_get_url(img_url).content
res = upload(img_wb)
print(res)
img_info = retry_get_url(res + "-imageinfo")
img_info_json = img_info.json()
qiniu_img_list.append((res + "-w", img_info_json))
except Exception as e:
print("down load img error %s" % e)
return {}
# 替换图片
if res_data["platform"] == "weibo":
res_data["qiniu_img_list"] = qiniu_img_list
if "http://t.cn/" in res_data["title"]:
res_data["title"] = res_data["title"].split("http://t.cn/")[0]
res_data["content"] = res_data["title"]
elif res_data["platform"] == "douban":
content = res_data.get("content")
if content:
for count, img_url in enumerate(res_data["img_list"]):
# print(qiniu_img_list[count][0])
content = content.replace(img_url, qiniu_img_list[count][0])
res_data["qiniu_img_list"] = qiniu_img_list
res_data["content"] = content
if res_data["platform"] == "weibo":
res_data["content"] = gm_convert_html_tags(res_data["title"], all_tags=True)
res_data["title"] = ""
elif res_data["platform"] == "douban":
res_data["content"] = gm_convert_html_tags(res_data["content"], all_tags=True)
return res_data
def write_data_into_mysql(res_data,mysql):
now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 清洗数据为可以入库的格式
data = push_data_to_user(res_data)
if not data.get("content"):
return None
if not data.get("qiniu_img_list"):
return None
tractate_id = 0
try:
sql_query = """insert into api_tractate
(user_id,content,is_online,status,platform,content_level,is_excellent,create_time,last_modified,user_del,low_quality,low_quality_deal,platform_id,pgc_type,title)
values ({user_id},'{content}',{is_online},{status},{platform},{content_level},{is_excellent},'{create_time}','{last_modified}',{user_del},{low_quality},{low_quality_deal},'{platform_id}',{pgc_type},'{title}');""".format(
user_id=random.choice(user_id_list), content=data["content"], is_online=0, status=2, platform=15,
content_level=data["level"],
is_excellent=0, create_time=now_str,
last_modified=now_str, user_del=0,
low_quality=0, low_quality_deal=0, platform_id=data["doc_id"], pgc_type=0, title=data["title"])
res = mysql.cur.execute(sql_query)
tractate_id = int(mysql.conn.insert_id())
if res:
mysql.conn.commit()
except Exception as e:
print("commit error %s" % e)
print(data)
mysql.conn.rollback()
if data.get("qiniu_img_list"):
for img_info in data.get("qiniu_img_list"):
if img_info[0] in data.get("content"):
image_url_source = 2
else:
image_url_source = 3
try:
image_type = img_type.get(img_info[1]["format"].upper())
except:
image_type = 1
try:
width = img_info[1]["width"]
height = img_info[1]["height"]
except:
width = 0
height = 0
try:
if img_type == 7:
sql_query = """
insert into api_tractate_images (tractate_id,image_url,width,image_webp,height,image_url_source,image_type,image_webp,create_time,update_time)
values ({tractate_id},'{image_url}',{width},{height},{image_webp},{image_url_source},{image_type},{image_webp},'{create_time}','{update_time}')
""".format(tractate_id=tractate_id, image_url=img_info[0], width=width,
height=height, image_url_source=image_url_source,
image_type=image_type, image_webp=img_info[0],
create_time=now_str, update_time=now_str)
else:
sql_query = """
insert into api_tractate_images (tractate_id,image_url,width,height,image_url_source,image_type,create_time,update_time)
values ({tractate_id},'{image_url}',{width},{height},{image_url_source},{image_type},'{create_time}','{update_time}')
""".format(tractate_id=tractate_id, image_url=img_info[0], width=width,
height=height, image_url_source=image_url_source, image_type=image_type,
create_time=now_str, update_time=now_str)
res = mysql.cur.execute(sql_query)
if res:
mysql.conn.commit()
except Exception as e:
print("commit error %s" % e)
mysql.conn.rollback()
if tractate_id:
return tractate_id
else:
return None
def output_result(result_Lst, platform, def output_result(result_Lst, platform,
output_to_file=False, filepath=None, output_to_file=False, filepath=None,
output_to_es_raw=False, output_to_es_raw=False,
output_to_es_register=False, output_to_es_register=False,
push_to_redis=False, push_to_redis=False,
batch_str=None, output_to_test_mysql=False,
release_time_lower_bdr=None, output_to_mimas_mysql=False,
es_index=index_site_crawler,**kwargs): es_index=index_site_crawler, **kwargs):
# write data into es crawler-raw index # write data into es crawler-raw index
if output_to_es_raw: if output_to_es_raw:
bulk_write_into_es(result_Lst, es_index) bulk_write_into_es(result_Lst, es_index)
...@@ -88,36 +308,38 @@ def output_result(result_Lst, platform, ...@@ -88,36 +308,38 @@ def output_result(result_Lst, platform,
index=es_index, index=es_index,
construct_id=True, construct_id=True,
platform=platform, platform=platform,
) )
if output_to_test_mysql:
pass
# feed url into redis # feed url into redis
if push_to_redis: if push_to_redis:
feed_url_into_redis( feed_url_into_redis(
result_Lst,expire=kwargs.get("expire")) result_Lst, expire=kwargs.get("expire"))
# output into file according to passed in parameters # output into file according to passed in parameters
if output_to_file is True and filepath is not None: if output_to_file is True and filepath is not None:
output_fn = ('crawler_%s_on_%s_json' output_fn = ('crawler_%s_on_%s_json'
% (platform, datetime.datetime.now().isoformat()[:10])) % (platform, datetime.datetime.now().isoformat()[:10]))
output_f = open(filepath+'/'+output_fn, 'a', encoding='utf-8') output_f = open(filepath + '/' + output_fn, 'a', encoding='utf-8')
write_into_file(result_Lst, output_f) write_into_file(result_Lst, output_f)
output_f.close() output_f.close()
else: else:
return result_Lst return result_Lst
def retry_get_url(url, retrys=3, proxies=None,timeout=10,**kwargs): def retry_get_url(url, retrys=3, proxies=None, timeout=10, **kwargs):
retry_c = 0 retry_c = 0
while retry_c < retrys: while retry_c < retrys:
try: try:
if proxies: if proxies:
proxies_dic = get_proxy(proxies) proxies_dic = get_proxy(proxies)
if not proxies_dic: if not proxies_dic:
get_resp = requests.get(url, timeout=timeout,**kwargs) get_resp = requests.get(url, timeout=timeout, **kwargs)
else: else:
get_resp = requests.get(url, proxies=proxies_dic,timeout=timeout, **kwargs) get_resp = requests.get(url, proxies=proxies_dic, timeout=timeout, **kwargs)
else: else:
get_resp = requests.get(url, timeout=timeout,**kwargs) get_resp = requests.get(url, timeout=timeout, **kwargs)
return get_resp return get_resp
except Exception as e: except Exception as e:
retry_c += 1 retry_c += 1
...@@ -151,7 +373,7 @@ def bulk_write_into_es(dict_Lst, ...@@ -151,7 +373,7 @@ def bulk_write_into_es(dict_Lst,
bulk_write_body = '' bulk_write_body = ''
write_counter = 0 write_counter = 0
def bulk_write_with_retry_UnicodeEncodeError(index,bulk_write_body, def bulk_write_with_retry_UnicodeEncodeError(index, bulk_write_body,
retry_counter_for_UnicodeEncodeError): retry_counter_for_UnicodeEncodeError):
if bulk_write_body != '': if bulk_write_body != '':
try: try:
...@@ -159,7 +381,7 @@ def bulk_write_into_es(dict_Lst, ...@@ -159,7 +381,7 @@ def bulk_write_into_es(dict_Lst,
body=bulk_write_body, body=bulk_write_body,
request_timeout=100) request_timeout=100)
bulk_write_body = '' bulk_write_body = ''
# print(bulk_write_resp) # print(bulk_write_resp)
print('Writing into es done') print('Writing into es done')
except UnicodeEncodeError as ue: except UnicodeEncodeError as ue:
print('Got UnicodeEncodeError, will remove ill formed string and retry.') print('Got UnicodeEncodeError, will remove ill formed string and retry.')
...@@ -167,11 +389,11 @@ def bulk_write_into_es(dict_Lst, ...@@ -167,11 +389,11 @@ def bulk_write_into_es(dict_Lst,
UnicodeEncodeError_msg = ue.__str__() UnicodeEncodeError_msg = ue.__str__()
ill_str_idxs = get_ill_encoded_str_posi(UnicodeEncodeError_msg) ill_str_idxs = get_ill_encoded_str_posi(UnicodeEncodeError_msg)
if len(ill_str_idxs) == 2: if len(ill_str_idxs) == 2:
ill_str = bulk_write_body[ill_str_idxs[0]: ill_str_idxs[1]+1] ill_str = bulk_write_body[ill_str_idxs[0]: ill_str_idxs[1] + 1]
bulk_write_body = bulk_write_body.replace(ill_str, '') bulk_write_body = bulk_write_body.replace(ill_str, '')
bulk_write_with_retry_UnicodeEncodeError(index,bulk_write_body, bulk_write_with_retry_UnicodeEncodeError(index, bulk_write_body,
retry_counter_for_UnicodeEncodeError retry_counter_for_UnicodeEncodeError
) )
except TransportError: except TransportError:
print("output to es register error") print("output to es register error")
write_str_into_file(file_path='/home/', write_str_into_file(file_path='/home/',
...@@ -182,7 +404,7 @@ def bulk_write_into_es(dict_Lst, ...@@ -182,7 +404,7 @@ def bulk_write_into_es(dict_Lst,
for line in dict_Lst: for line in dict_Lst:
write_counter += 1 write_counter += 1
if construct_id and platform is not None: if construct_id and platform is not None:
doc_id = cal_doc_id(platform, url=line["url"], doc_id_type='all-time-url',data_dict=line) doc_id = cal_doc_id(platform, url=line["url"], doc_id_type='all-time-url', data_dict=line)
action_str = ('{ "index" : { "_index" : "%s", "_id" : "%s" } }' action_str = ('{ "index" : { "_index" : "%s", "_id" : "%s" } }'
% (index, doc_id)) % (index, doc_id))
else: else:
...@@ -191,15 +413,15 @@ def bulk_write_into_es(dict_Lst, ...@@ -191,15 +413,15 @@ def bulk_write_into_es(dict_Lst,
data_str = json.dumps(line, ensure_ascii=False) data_str = json.dumps(line, ensure_ascii=False)
line_body = action_str + '\n' + data_str + '\n' line_body = action_str + '\n' + data_str + '\n'
bulk_write_body += line_body bulk_write_body += line_body
if write_counter%1000 == 0 or write_counter == len(dict_Lst): if write_counter % 1000 == 0 or write_counter == len(dict_Lst):
print('Writing into es %s %d/%d' % (index, print('Writing into es %s %d/%d' % (index,
write_counter, write_counter,
len(dict_Lst))) len(dict_Lst)))
if bulk_write_body != '': if bulk_write_body != '':
retry_counter_for_UnicodeEncodeError = 0 retry_counter_for_UnicodeEncodeError = 0
retry_counter_for_UnicodeEncodeError = bulk_write_with_retry_UnicodeEncodeError(index, retry_counter_for_UnicodeEncodeError = bulk_write_with_retry_UnicodeEncodeError(index,
bulk_write_body, bulk_write_body,
retry_counter_for_UnicodeEncodeError) retry_counter_for_UnicodeEncodeError)
bulk_write_body = '' bulk_write_body = ''
...@@ -215,7 +437,7 @@ def load_json_file_into_dict_Lst(filename, path): ...@@ -215,7 +437,7 @@ def load_json_file_into_dict_Lst(filename, path):
if path[-1] != '/': if path[-1] != '/':
path += '/' path += '/'
data_Lst = [] data_Lst = []
with open(path+filename, 'r', encoding='utf-8') as f: with open(path + filename, 'r', encoding='utf-8') as f:
for line in f: for line in f:
line_d = json.loads(line) line_d = json.loads(line)
if 'data_provider' not in line_d: if 'data_provider' not in line_d:
...@@ -234,7 +456,7 @@ def crawl_a_url_and_update_redis(url, platform, urlhash, processID=-1): ...@@ -234,7 +456,7 @@ def crawl_a_url_and_update_redis(url, platform, urlhash, processID=-1):
# perform crawling, get the data # perform crawling, get the data
# write es or output to files # write es or output to files
# update redis # update redis
ts = int(datetime.datetime.now().timestamp()*1e3) ts = int(datetime.datetime.now().timestamp() * 1e3)
redis_hmset_dict = {'push_time': ts, 'is_fetched': 1, redis_hmset_dict = {'push_time': ts, 'is_fetched': 1,
'url': url, 'platform': platform} 'url': url, 'platform': platform}
rds.hmset(urlhash, redis_hmset_dict) rds.hmset(urlhash, redis_hmset_dict)
...@@ -254,7 +476,7 @@ def scan_redis_to_crawl(): ...@@ -254,7 +476,7 @@ def scan_redis_to_crawl():
scan_counter = 0 scan_counter = 0
while True: while True:
scan_counter += 1 scan_counter += 1
if scan_counter%5 == 0: if scan_counter % 5 == 0:
print(scan_counter, 'cur:', cur, datetime.datetime.now()) print(scan_counter, 'cur:', cur, datetime.datetime.now())
cur, hash_keys = rds.scan(cur) cur, hash_keys = rds.scan(cur)
for urlhash in hash_keys: for urlhash in hash_keys:
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
import redis,time,json,datetime,sys import redis,time,json,datetime,sys
from maintenance.func_send_email_with_file import send_file_email from maintenance.func_send_email_with_file import send_file_email
rds = redis.StrictRedis(host='154.8.190.251', port=6379, db=19,decode_responses=True) rds = redis.StrictRedis(host='172.18.51.10', port=6379, db=19,decode_responses=True)
def write_email_task_to_redis(task_name=None,file_path=None, data_str=None, email_group=[], def write_email_task_to_redis(task_name=None,file_path=None, data_str=None, email_group=[],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment