Commit 4362749f authored by litaolemo's avatar litaolemo

update

parent fee41916
......@@ -8,11 +8,16 @@ import redis, json
from crawler_sys.framework.platform_redis_register import get_redis_list_name
from crawler_sys.framework.es_crawler import scan_crawler_url_register
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
def redis_path(redis_type=""):
if redis_type == "on_line":
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
else:
rds = redis.StrictRedis(host='172.18.51.10', port=6379, db=19)
return rds
def feed_url_into_redis(dict_Lst, expire=0,
):
def feed_url_into_redis(dict_Lst, expire=0,rds=redis_path):
"""
release_time_lower_bdr must be an int value represent
timestamp in milliseconds if given.
......
......@@ -16,20 +16,18 @@ PARSER = argparse.ArgumentParser(description='video platform search page crawler
# '/crawler_sys/framework/config'
# '/search_keywords.ini'),
# help=('config file absolute path'))
PARSER.add_argument('-p', '--platform', default=["toutiao","腾讯新闻", "腾讯视频", "new_tudou"], action='append',
PARSER.add_argument('-p', '--platform', default=["toutiao","weibo", "zhihu"], action='append',
help=('legal platform name is required'))
PARSER.add_argument('-k', '--key_word_platform', default=[], action='append',
help=('key_word_legal platform name is required'))
PARSER.add_argument('-w', '--output_to_es_raw', default=True,
PARSER.add_argument('-w', '--output_to_es_raw', default=False,
help=('output to es raw'))
PARSER.add_argument('-g', '--output_to_es_register', default=False,
PARSER.add_argument('-g', '--output_to_es_register', default=True,
help=('output to es register'))
PARSER.add_argument('-n', '--maxpage', default=20,
help=('maxpage'))
ARGS = PARSER.parse_args()
es_framework = Elasticsearch(hosts='192.168.17.11', port=80,
http_auth=('crawler', 'XBcasfo8dgfs'))
......@@ -41,45 +39,92 @@ if ARGS.platform != []:
# "program will exit" % platform)
# sys.exit(0)
# CONFIG = configparser.ConfigParser()
# CONFIG.read(ARGS.conf, encoding='utf-8')
OUTPUT_TO_ES_RAW = ARGS.output_to_es_raw
OUTPUT_TO_ES_REGISTER = ARGS.output_to_es_register
#
# def func_search_keywordlist(platform):
# search_body = {"query": {"bool": {"filter": []}}}
# search_resp = es_framework.search(index=index_target_releaser,
# doc_type=doc_type_target_releaser,
# body=search_body,
# size=0,
# request_timeout=100)
# total_hit = search_resp['hits']['total']
# releaser_dic = {}
# if total_hit > 0:
# print('Got %d releaser for platform %s.' % (total_hit, platform))
# scan_resp = scan(client=es_framework, query=search_body,
# index=index_target_releaser,
# doc_type=doc_type_target_releaser,
# request_timeout=200)
# for line in scan_resp:
# try:
# title = line['_source']['title']
# page = line['_source']['page']
# releaser_dic[title] = page
# except:
# print('error in :', line)
# continue
# else:
# print('Got zero hits.')
# return releaser_dic
def func_search_keywordlist(platform):
search_body = {"query": {"bool": {"filter": []}}}
search_resp = es_framework.search(index=index_target_releaser,
doc_type=doc_type_target_releaser,
body=search_body,
size=0,
request_timeout=100)
total_hit = search_resp['hits']['total']
releaser_dic = {}
if total_hit > 0:
print('Got %d releaser for platform %s.' % (total_hit, platform))
scan_resp = scan(client=es_framework, query=search_body,
index=index_target_releaser,
doc_type=doc_type_target_releaser,
request_timeout=200)
for line in scan_resp:
try:
title = line['_source']['title']
page = line['_source']['page']
releaser_dic[title] = page
except:
print('error in :', line)
continue
else:
print('Got zero hits.')
return releaser_dic
if OUTPUT_TO_ES_RAW is True:
ES_INDEX = 'test2'
DOC_TYPE = 'doc'
print(ES_INDEX, DOC_TYPE)
res_dic = {}
res_list = ["比基尼线脱毛",
"嗨体泪沟",
"根据脸型选发型",
"圆脸适合什么发型",
"5热玛吉",
"耳软骨假体鼻综合",
"肉毒素去法令纹",
"吸脂瘦腹部",
"嗨体填充泪沟",
"6d小脸针",
"水剥离",
"嗨体去颈纹",
"胶原蛋白填充泪沟",
"吸脂瘦全身",
"肉毒素去狐臭",
"吸脂瘦腰部",
"fotona4d",
"嘴综合",
"胸部下垂矫正",
"5g天使光雕",
"唇综合",
"SVF-gel脂肪胶",
"嘴角上扬术",
"嗨体注射",
"脂肪填充修复",
"比基尼脱毛",
"lams吸脂",
"脂肪填充面部年轻化",
"嗨体",
"吸脂祛副乳",
"m22",
"胸部提升",
"fotona",
"O型腿矫正",
"肋骨鼻",
"欣颜",
"唯颜",
"垫眉骨",
"咬肌切除",
"背部吸脂",
"m22王者之冠",
"bbl",
"胶原蛋白填充祛黑眼圈",
"热玛吉",
"热玛吉5代",
]
for l in res_list:
res_dic[l] = 1
return res_dic
ES_INDEX = 'crawler-data-raw'
print(ES_INDEX)
pages = ARGS.maxpage
for platform in PLATFORM_LIST:
......@@ -92,12 +137,11 @@ for platform in PLATFORM_LIST:
print("search keyword '%s' on platform %s" % (keyword, platform))
search_pages = int(KEYWORD_dic[keyword])
try:
if platform != "腾讯新闻":
crawler.search_page(keyword=keyword,
search_pages_max=search_pages,
output_to_es_raw=OUTPUT_TO_ES_RAW,
output_to_es_register=OUTPUT_TO_ES_REGISTER,
es_index=ES_INDEX,)
crawler.search_page(keyword=keyword,
search_pages_max=search_pages,
output_to_es_raw=OUTPUT_TO_ES_RAW,
output_to_es_register=OUTPUT_TO_ES_REGISTER,
es_index=ES_INDEX,)
except Exception as e:
print(e)
......
......@@ -10,7 +10,7 @@
"""
import redis, random
import kdl, requests
import sys
# from redis.sentinel import Sentinel
# sentinel = Sentinel([('192.168.17.65', 26379),
......@@ -23,9 +23,23 @@ import kdl, requests
# slave = sentinel.discover_slaves('ida_redis_master')
# # 连接数据库
# rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=7, decode_responses=True)
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=18, decode_responses=True, password='ReDis!GmTx*0aN12')
def func_get_redis():
sys_path = sys.path
for p in sys_path:
if "C:\\" in p:
stats = "test"
break
if stats == "on_line":
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=18, decode_responses=True, password='ReDis!GmTx*0aN12')
else:
rds = redis.StrictRedis(host='172.18.51.10', port=6379, db=18, decode_responses=True)
return rds
rds = func_get_redis()
def get_proxy_from_redis():
try:
one_proxy = rds.randomkey()
......
......@@ -20,11 +20,11 @@ parser.add_argument('-d', '--days_from_now', default=30, type=int,
'default 30.'))
args = parser.parse_args()
def redis_url_batch_gen(platform, batch_str, release_time_lower_bdr):
url_Lst = pull_url_from_es(platform, release_time_lower_bdr)
if url_Lst != []:
redis_list_name, push_counter = feed_url_into_redis(url_Lst, platform,
batch_str=batch_str)
redis_list_name, push_counter = feed_url_into_redis(url_Lst, platform,)
return (redis_list_name, push_counter)
else:
return (None, None)
......
......@@ -343,8 +343,7 @@ class Crawler_toutiao():
def search_page_old(self, keyword, search_pages_max=12,
output_to_es_raw=False,
output_to_es_register=False,
es_index=None,
doc_type=None,proxies_num=0):
es_index=None,proxies_num=0):
headers_search = {
"accept": "application/json, text/javascript",
"accept-encoding": "gzip, deflate",
......@@ -428,9 +427,10 @@ class Crawler_toutiao():
print("method get_web_article_info error %s" % e)
print(D0)
toutiao_Lst.append(D0)
except KeyError:
except Exception as e:
# It's totally ok to drop the last return data value.
# The search api just return something seems related to search
print(e)
continue
else:
break
......@@ -440,7 +440,7 @@ class Crawler_toutiao():
output_to_es_raw=output_to_es_raw,
output_to_es_register=output_to_es_register,
es_index=es_index,
doc_type=doc_type)
)
toutiao_Lst.clear()
if toutiao_Lst != []:
......@@ -449,7 +449,7 @@ class Crawler_toutiao():
output_to_es_raw=output_to_es_raw,
output_to_es_register=output_to_es_register,
es_index=es_index,
doc_type=doc_type)
)
return toutiao_Lst
......@@ -461,7 +461,7 @@ class Crawler_toutiao():
self.search_page_old(keyword, search_pages_max=search_pages_max, output_to_es_raw=output_to_es_raw,
output_to_es_register=output_to_es_register,
es_index=es_index,
doc_type=doc_type, proxies_num=proxies_num)
proxies_num=proxies_num)
def find_releaser_id(self, releaserUrl):
return get_releaser_id(platform=self.platform, releaserUrl=releaserUrl)
......@@ -1799,4 +1799,4 @@ if __name__ == '__main__':
# doc_type='doc',
# releaser_page_num_max=3, proxies_num=1))
# test.releaser_page(u)
test.search_page("热玛吉五代")
test.search_page("比基尼线脱毛")
......@@ -432,6 +432,7 @@ class Crawler_weibo():
video_dic["releaserUrl"] = data["userinfo"].get('url')
video_dic["releaser_id_str"] = "weibo_" + str(video_dic["releaser_id"])
video_dic["img_list"] = re.findall('img src="(.*?)"',data["content"])
video_dic["mid"] = article_id
return video_dic
except Exception as e:
print("single data row formate error %s" % e)
......@@ -442,6 +443,7 @@ class Crawler_weibo():
output_to_es_register=False,
es_index=None,
doc_type=None, proxies_num=0):
count_false = 0
headers_search = {
"Accept": "application/json, text/plain, */*",
"MWeibo-Pwa": "1",
......@@ -463,6 +465,13 @@ class Crawler_weibo():
if get_page.status_code != 200:
continue
page_dict = get_page.json()
while page_dict['data'].get("msg") == '这里还没有内容':
get_page = retry_get_url(search_page_url, headers=headers_search)
page_dict = get_page.json()
count_false += 1
if count_false >= 3:
continue
if page_dict['data'].get("cards")[0].get("card_group"):
for one_line in page_dict['data'].get("cards")[0].get("card_group"):
try:
......@@ -488,7 +497,7 @@ class Crawler_weibo():
# D0['play_count'] = play_count
# D0['comment_count'] = comment_count
# D0['favorite_count'] = favorite_count
D0['article_id'] = article_id
D0['mid'] = article_id
# D0['releaser'] = releaser
# D0['releaserUrl'] = releaserUrl
# D0['release_time'] = release_time
......@@ -501,6 +510,7 @@ class Crawler_weibo():
D0.update(article_info)
except Exception as e:
print("method get_web_article_info error %s" % e)
continue
print(D0)
weibo_Lst.append(D0)
except KeyError:
......@@ -850,5 +860,5 @@ if __name__ == '__main__':
# test_search2 = weibo.search_page(keyword, user_name, password)
# test_repost = weibo.repost_page(weibo_id, user_name, password)
# user_page = weibo.user_page(user_id, user_name, password)
weibo.search_page("迪丽热巴")
weibo.search_page("迪丽热巴",output_to_es_register=True,es_index="crawler-data-raw",search_pages_max=1)
# print(user_page)
This diff is collapsed.
......@@ -15,7 +15,7 @@ import pymysql
import requests
from elasticsearch.exceptions import TransportError
from crawler_sys.framework.redis_interact import feed_url_into_redis
from crawler_sys.framework.redis_interact import rds
from crawler_sys.framework.redis_interact import redis_path
from crawler_sys.framework.es_ccr_index_defination import es_framework as es_site_crawler
from crawler_sys.framework.es_ccr_index_defination import index_url_register
from crawler_sys.framework.es_ccr_index_defination import doc_type_url_register
......@@ -296,7 +296,9 @@ def output_result(result_Lst, platform,
push_to_redis=False,
output_to_test_mysql=False,
output_to_mimas_mysql=False,
es_index=index_site_crawler, **kwargs):
es_index=index_site_crawler,
rds_path="on_line",
**kwargs):
# write data into es crawler-raw index
if output_to_es_raw:
bulk_write_into_es(result_Lst, es_index)
......@@ -308,14 +310,16 @@ def output_result(result_Lst, platform,
index=es_index,
construct_id=True,
platform=platform,
)
if output_to_test_mysql:
pass
# feed url into redis
if push_to_redis:
rds = redis_path(rds_path)
feed_url_into_redis(
result_Lst, expire=kwargs.get("expire"))
result_Lst, expire=kwargs.get("expire"),rds=rds)
# output into file according to passed in parameters
if output_to_file is True and filepath is not None:
......@@ -451,7 +455,7 @@ def load_json_file_into_dict_Lst(filename, path):
return data_Lst
def crawl_a_url_and_update_redis(url, platform, urlhash, processID=-1):
def crawl_a_url_and_update_redis(url, platform, urlhash, rds,processID=-1,):
# find crawler
# perform crawling, get the data
# write es or output to files
......@@ -469,7 +473,7 @@ def crawl_batch_task(url_Lst):
url_info['urlhash'])
def scan_redis_to_crawl():
def scan_redis_to_crawl(rds):
batch_size = 1000
cur = 0
task_batchs = []
......@@ -491,13 +495,13 @@ def scan_redis_to_crawl():
'urlhash': urlhash})
if len(task_batchs) == batch_size:
# multi-processing here
crawl_batch_task(task_batchs)
crawl_batch_task(rds,task_batchs)
task_batchs.clear()
if cur == 0:
break
def remove_fetched_url_from_redis(remove_interval=10):
def remove_fetched_url_from_redis(rds,remove_interval=10):
time.sleep(remove_interval)
cur = 0
delete_counter = 0
......
git+ssh://git@git.wanmeizhensuo.com/backend/gm-types.git@master
# git+ssh://git@git.wanmeizhensuo.com/backend/gm-types.git@master
lxml==4.5.1
requests==2.23.0
tqdm==4.46.1
......
......@@ -5,7 +5,7 @@
# @author : litao
def calculate_douban_id(data_dic):
def calculate_zhihu_id(data_dic):
if "answer" in data_dic["url"]:
return data_dic["_id"].replace("zhihu_","")
else:
......
......@@ -17,6 +17,7 @@ from write_data_into_es.calculate_doc_id.func_calculate_wangyi_news_id import ca
from write_data_into_es.calculate_doc_id.func_calculate_douyin_id import calculate_douyin_id
from write_data_into_es.calculate_doc_id.func_calculate_haokan_video_id import calculate_haokan_id
from write_data_into_es.calculate_doc_id.func_calculate_weibo_id import calculate_weibo_id
from write_data_into_es.calculate_doc_id.func_calculate_zhihu_id import calculate_zhihu_id
from write_data_into_es.calculate_doc_id.func_calculate_douban_id import calculate_douban_id
......@@ -32,7 +33,7 @@ def vid_cal_func(platform):
"haokan":calculate_haokan_id,
"weibo":calculate_weibo_id,
"douban":calculate_douban_id,
"zhihu":calculate_douban_id,
"zhihu":calculate_zhihu_id,
}
def general_vid_cal_func(url):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment