# -*- coding: utf-8 -*- """ Created on Thu Dec 21 17:52:31 2017 use python2 @author: hanye """ from __future__ import print_function from pandora import api,http,auth import requests #from pandora.models import * #from pandora.utils import * import json #import pandas as pd import datetime import time #import thread #import threading import logging #import sys from func_split_by_shards import split_by_shards import Queue def func_retrieve_data_from_qiniu(thread_id, loggerName, fetch_time_start_ts, fetch_time_end_ts, data_queue): endpoint = 'https://logdb.qiniu.com' url='/v5/logdbkibana/msearch' ak='UdtK_JT7yhln0-yA0-a2I96s497c_rwl-jC7Fikz' sk='mRjiaujBTd_P7TxvE__25Ryx62qFjWH9cBzHNC6y' headers=None method='POST' qiniu_retrieving_limit=10000 # f_log=sys.stdout # creater logger logger=logging.getLogger('%s.func_retrieve' % loggerName) logger.info('[ %d ] log starts' % thread_id) logger.info('[ %d ] fetch_time_start_ts: %s, ' % (thread_id, str(fetch_time_start_ts)) +'fetch_time_end_ts: %s ' % str(fetch_time_end_ts) ) # estimate all hits from the given conditions query_body={ "query": { "bool": { "must": [ {"range": {"fetch_time": {"from": fetch_time_start_ts, "to": fetch_time_end_ts, "include_lower": "true", "include_upper": "false"}}} ] } }, "size": 0, "from": 0, } json_body_query_outside='{"index":["csmvoide"]}\n'+json.dumps(query_body) # client claim should be near request clause, it seems qiniu has a relatively short # keep-alive time def retrieve_qiniu(method, endpoint, url, json_body, ak, sk, headers=None ): req=http.Request(method, endpoint+url, json_body, headers) req_auth = auth.Auth(ak, sk) req_auth.sign_request(req) session=requests.Session() retry_c=0 while retry_c<100: try: resp=session.request(req.method, req.url, data=req.data, headers=req.headers, stream=True, timeout=100) break except: retry_c+=1 logger.info('[ %d ] got exception when _do_request, sleep for 10 seconds to retry %d' % (thread_id, retry_c)) time.sleep(10) if retry_c==100: logger.info('[ %d ] Failed to establish connection after %d retries, return -1, %s' % (thread_id, retry_c, datetime.datetime.now())) return -1 return resp resp=retrieve_qiniu(method, endpoint, url, json_body_query_outside, ak, sk, headers=headers) re_dict=resp.json() data_total=re_dict['responses'][0]['hits']['total'] logger.info('[ %d ] overall hits for passed fetch_time range is %d' % (thread_id, data_total)) if data_total>qiniu_retrieving_limit: logger.info('[ %d ] over limit of qiniu, exists.' % thread_id) return 0 data_collector=[] size=10000 if data_total%size==0: page_total=data_total//size else: page_total=data_total//size+1 for page in range(0, page_total): query_body['size']=size query_body['from']=page*size json_body_query='{"index":["csmvoide"]}\n'+json.dumps(query_body) resp=retrieve_qiniu(method, endpoint, url, json_body_query, ak, sk, headers=headers) re_dict=resp.json() data_raw_Lst=re_dict['responses'][0]['hits']['hits'] logger.info('[ %d ] page %d/%d, len of data_raw_Lst %d' % (thread_id, page+1, page_total, len(data_raw_Lst))) for line in data_raw_Lst: data_collector.append(line['_source']) for line in data_collector: retry_for_full=0 while True: if not data_queue.full(): # if queue is not full, put data in try: data_queue.put(line, block=False) break except Queue.Full: time.sleep(30) retry_for_full+=1 logger.info('[ %d ] retry_for_full %d, put failed after putting data. Will try putting again after 30 seconds.' %(thread_id, retry_for_full)) else: time.sleep(30) retry_for_full+=1 logger.info('[ %d ] retry_for_full %d, queue is full before putting data into. Will retry after 30 seconds.' %(thread_id, retry_for_full)) logger.info('[ %d ] thread exists %s' %(thread_id, datetime.datetime.now())) return 0 if __name__=='__main__': print( 'test fetch starts', datetime.datetime.now() ) # add 8 hours to deal with UTC things fetch_time_start_ts=int(time.mktime(datetime.datetime.strptime('2017-12-01T00:00:00', '%Y-%m-%dT%H:%M:%S').timetuple())*1e3+8*3600*1e3) fetch_time_end_ts=int(fetch_time_start_ts+100*1e3) func_retrieve_data_from_qiniu(fetch_time_start_ts, fetch_time_end_ts, 10)