1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# -*- coding: utf-8 -*-
"""
Created on Thu Dec 21 17:52:31 2017
use python2
@author: hanye
"""
from __future__ import print_function
from pandora import api,http,auth
import requests
#from pandora.models import *
#from pandora.utils import *
import json
#import pandas as pd
import datetime
import time
#import thread
#import threading
import logging
#import sys
from func_split_by_shards import split_by_shards
import Queue
def func_retrieve_data_from_qiniu(thread_id, loggerName, fetch_time_start_ts, fetch_time_end_ts,
data_queue):
endpoint = 'https://logdb.qiniu.com'
url='/v5/logdbkibana/msearch'
ak='UdtK_JT7yhln0-yA0-a2I96s497c_rwl-jC7Fikz'
sk='mRjiaujBTd_P7TxvE__25Ryx62qFjWH9cBzHNC6y'
headers=None
method='POST'
qiniu_retrieving_limit=10000
# f_log=sys.stdout
# creater logger
logger=logging.getLogger('%s.func_retrieve' % loggerName)
logger.info('[ %d ] log starts' % thread_id)
logger.info('[ %d ] fetch_time_start_ts: %s, ' % (thread_id, str(fetch_time_start_ts))
+'fetch_time_end_ts: %s ' % str(fetch_time_end_ts)
)
# estimate all hits from the given conditions
query_body={
"query": {
"bool": {
"must": [
{"range": {"fetch_time": {"from": fetch_time_start_ts,
"to": fetch_time_end_ts,
"include_lower": "true",
"include_upper": "false"}}}
]
}
},
"size": 0,
"from": 0,
}
json_body_query_outside='{"index":["csmvoide"]}\n'+json.dumps(query_body)
# client claim should be near request clause, it seems qiniu has a relatively short
# keep-alive time
def retrieve_qiniu(method, endpoint, url, json_body, ak, sk, headers=None ):
req=http.Request(method, endpoint+url, json_body, headers)
req_auth = auth.Auth(ak, sk)
req_auth.sign_request(req)
session=requests.Session()
retry_c=0
while retry_c<100:
try:
resp=session.request(req.method, req.url, data=req.data, headers=req.headers, stream=True, timeout=100)
break
except:
retry_c+=1
logger.info('[ %d ] got exception when _do_request, sleep for 10 seconds to retry %d' % (thread_id, retry_c))
time.sleep(10)
if retry_c==100:
logger.info('[ %d ] Failed to establish connection after %d retries, return -1, %s' % (thread_id, retry_c, datetime.datetime.now()))
return -1
return resp
resp=retrieve_qiniu(method, endpoint, url, json_body_query_outside, ak, sk, headers=headers)
re_dict=resp.json()
data_total=re_dict['responses'][0]['hits']['total']
logger.info('[ %d ] overall hits for passed fetch_time range is %d' % (thread_id, data_total))
if data_total>qiniu_retrieving_limit:
logger.info('[ %d ] over limit of qiniu, exists.' % thread_id)
return 0
data_collector=[]
size=10000
if data_total%size==0:
page_total=data_total//size
else:
page_total=data_total//size+1
for page in range(0, page_total):
query_body['size']=size
query_body['from']=page*size
json_body_query='{"index":["csmvoide"]}\n'+json.dumps(query_body)
resp=retrieve_qiniu(method, endpoint, url, json_body_query, ak, sk, headers=headers)
re_dict=resp.json()
data_raw_Lst=re_dict['responses'][0]['hits']['hits']
logger.info('[ %d ] page %d/%d, len of data_raw_Lst %d' % (thread_id, page+1, page_total, len(data_raw_Lst)))
for line in data_raw_Lst:
data_collector.append(line['_source'])
for line in data_collector:
retry_for_full=0
while True:
if not data_queue.full(): # if queue is not full, put data in
try:
data_queue.put(line, block=False)
break
except Queue.Full:
time.sleep(30)
retry_for_full+=1
logger.info('[ %d ] retry_for_full %d, put failed after putting data. Will try putting again after 30 seconds.' %(thread_id, retry_for_full))
else:
time.sleep(30)
retry_for_full+=1
logger.info('[ %d ] retry_for_full %d, queue is full before putting data into. Will retry after 30 seconds.' %(thread_id, retry_for_full))
logger.info('[ %d ] thread exists %s' %(thread_id, datetime.datetime.now()))
return 0
if __name__=='__main__':
print( 'test fetch starts', datetime.datetime.now() )
# add 8 hours to deal with UTC things
fetch_time_start_ts=int(time.mktime(datetime.datetime.strptime('2017-12-01T00:00:00', '%Y-%m-%dT%H:%M:%S').timetuple())*1e3+8*3600*1e3)
fetch_time_end_ts=int(fetch_time_start_ts+100*1e3)
func_retrieve_data_from_qiniu(fetch_time_start_ts, fetch_time_end_ts, 10)