# -*- coding: utf-8 -*- """ Created on Tue Dec 4 14:00:03 2018 @author: fangyucheng """ import argparse import configparser import random from concurrent.futures.process import ProcessPoolExecutor from elasticsearch.helpers import scan from elasticsearch import Elasticsearch from crawler.crawler_sys.framework.platform_crawler_register import get_crawler PARSER = argparse.ArgumentParser(description='video platform search page crawler') # PARSER.add_argument('-c', '--conf', default=('/home/hanye/crawlersNew/crawler' # '/crawler_sys/framework/config' # '/search_keywords.ini'), # help=('config file absolute path')) PARSER.add_argument('-p', '--platform', default=[], action='append', help=('legal platform name is required')) PARSER.add_argument('-k', '--key_word_platform', default=[], action='append', help=('key_word_legal platform name is required')) PARSER.add_argument('-w', '--output_to_es_raw', default=False, help=('output to es raw')) PARSER.add_argument('-g', '--output_to_es_register', default=True, help=('output to es register')) PARSER.add_argument('-n', '--maxpage', default=20, help=('maxpage')) PARSER.add_argument('-px', '--proxies_num', default=3, help=('proxies_num')) ARGS = PARSER.parse_args() # for platform in PLATFORM_LIST: # if platform not in legal_platform_name: # print("%s is not a legal platform name, " # "program will exit" % platform) # sys.exit(0) OUTPUT_TO_ES_RAW = ARGS.output_to_es_raw OUTPUT_TO_ES_REGISTER = ARGS.output_to_es_register PLATFORM_LIST = ARGS.platform proxies_num = ARGS.proxies_num # # def func_search_keywordlist(platform): # search_body = {"query": {"bool": {"filter": []}}} # search_resp = es_framework.search(index=index_target_releaser, # doc_type=doc_type_target_releaser, # body=search_body, # size=0, # request_timeout=100) # total_hit = search_resp['hits']['total'] # releaser_dic = {} # if total_hit > 0: # print('Got %d releaser for platform %s.' % (total_hit, platform)) # scan_resp = scan(client=es_framework, query=search_body, # index=index_target_releaser, # doc_type=doc_type_target_releaser, # request_timeout=200) # for line in scan_resp: # try: # title = line['_source']['title'] # page = line['_source']['page'] # releaser_dic[title] = page # except: # print('error in :', line) # continue # else: # print('Got zero hits.') # return releaser_dic def func_search_keywordlist(platform): es_framework = Elasticsearch(hosts='172.16.32.37', port=9200) search_body = { "query": { "bool": { "filter": [ ] } } } return_dict = {} scan_res = scan(es_framework, query=search_body, index="search_word") for res in scan_res: return_dict[res["_source"]["title"]] = res["_source"]["max_page"] return return_dict ES_INDEX = 'crawler-data-raw' print(ES_INDEX) pages = ARGS.maxpage for platform in PLATFORM_LIST: search_pages = [] initialize_crawler = get_crawler(platform) crawler = initialize_crawler() KEYWORD_dic = func_search_keywordlist(platform) # KEYWORD_dic = random.shuffle(KEYWORD_dic) executor = ProcessPoolExecutor(max_workers=3) futures = [] for keyword in KEYWORD_dic: print("search keyword '%s' on platform %s" % (keyword, platform)) search_pages = int(KEYWORD_dic[keyword]) # try: # crawler.search_page(keyword=keyword, # search_pages_max=search_pages, # output_to_es_raw=OUTPUT_TO_ES_RAW, # output_to_es_register=OUTPUT_TO_ES_REGISTER, # es_index=ES_INDEX,proxies_num=proxies_num) # # except Exception as e: # print(e) # continue future = executor.submit(crawler.search_page, keyword,search_pages_max=search_pages, output_to_es_raw=OUTPUT_TO_ES_RAW, output_to_es_register=OUTPUT_TO_ES_REGISTER, es_index=ES_INDEX,proxies_num=proxies_num) futures.append(future) executor.shutdown(True) # config file absolute path in serve # '/home/hanye/crawlers/crawler_sys/framework/config/search_keywords.ini'