1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# -*- coding: utf-8 -*-
"""
Created on Wed Jun 20 08:54:21 2018
@author: hanye
"""
import datetime
import sys
from elasticsearch.helpers import scan
from elasticsearch import Elasticsearch
es = Elasticsearch(hosts='192.168.17.11', port=9200)
## when test, us es_hy_sign_in
#from es_hy_sign_in import es
from func_find_week_num import find_week_belongs_to
from func_general_bulk_write import bulk_write_short_video
def define_doc_type(week_year, week_no, week_day_start):
"""
doc_type = 'daily-url-2018_w24_s2' means select Tuesday as the
first day of each week, it's year 2018's 24th week.
In isocalendar defination, Monday - weekday 1, Tuesday - weekday 2,
..., Saturday - weekday 6, Sunday - weekday 7.
"""
doc_type_str = 'daily-url-%d_w%02d_s%d' % (week_year, week_no, week_day_start)
return doc_type_str
def update_weekly_datapool(fetch_time_start_T,
fetch_time_end_T=None,
index_source='short-video-production',
index_dest='short-video-weekly',
week_day_start=1,
f_log=sys.stdout):
doc_type_daily = 'daily-url'
if fetch_time_end_T is None:
fetch_time_end_T = fetch_time_start_T + datetime.timedelta(days=1)
else:
pass
if (fetch_time_end_T-fetch_time_start_T).days <= 1:
week_year, week_no, week_day = find_week_belongs_to(fetch_time_start_T, week_day_start)
doc_type_data_pool = define_doc_type(week_year, week_no, week_day_start=week_day_start)
fetch_time_start_ts = int(fetch_time_start_T.timestamp()*1e3)
fetch_time_end_ts = int(fetch_time_end_T.timestamp()*1e3)
release_time_start_T = fetch_time_start_T - datetime.timedelta(days=30)
find_daily_data_bd = {
"query": {
"bool": {
"filter": [
{"range": {"fetch_time": {"gte": fetch_time_start_ts,
"lt": fetch_time_end_ts}}},
{"range": {"release_time": {"gte": release_time_start_T,
"lt": fetch_time_end_ts}}}
]
}
}
}
search_resp = es.search(index=index_source, doc_type=doc_type_daily,
body=find_daily_data_bd, size=0,
request_timeout=100)
hit_total = search_resp['hits']['total']
print('Got %d results in %s/%s on fetch_time range [%s, %s)'
% (hit_total, index_source, doc_type_daily,
fetch_time_start_T, fetch_time_end_T),
datetime.datetime.now(), file=f_log)
if hit_total > 0:
print('Will write into %s/%s' % (index_dest, doc_type_data_pool),
datetime.datetime.now(), file=f_log)
scan_resp = scan(client=es, index=index_source, doc_type=doc_type_daily,
query=find_daily_data_bd, request_timeout=300)
line_counter = 0
data_Lst = []
for line in scan_resp:
line_counter += 1
line_d = line['_source']
data_Lst.append(line_d)
if line_counter%1000 == 0 or line_counter == hit_total:
print('Writing lines %d/%d [%.2f%%]'
% (line_counter, hit_total, line_counter/hit_total*100),
datetime.datetime.now(), file=f_log)
bulk_write_resp = bulk_write_short_video(data_Lst, client=es,
index=index_dest,
doc_type=doc_type_data_pool,
doc_id_type='all-time-url',
f_log=f_log)
data_Lst.clear()
else:
print('Got zero hits, program exits.', datetime.datetime.now(), file=f_log)
else:
print('It\'s NOT recommended that pass in fetch_time range more '
'than 1 day.', datetime.datetime.now(), file=f_log)
print('fetch_time_end_T is more than 1 day greater than fetch_time_start_T, '
'will be processed recursively one by one at 1 day interval.',
datetime.datetime.now(), file=f_log)
fetch_time_end_T_ori = fetch_time_end_T
fetch_time_end_T_alter = fetch_time_start_T + datetime.timedelta(days=1)
print('Alter fetch_time_end_T to be 1 day later than fetch_time_start_T,',
datetime.datetime.now(), file=f_log)
update_weekly_datapool(fetch_time_start_T,
fetch_time_end_T_alter,
index_source=index_source,
index_dest=index_dest,
f_log=f_log)
print('Processing data started after altered fetch_time_end_T',
datetime.datetime.now(), file=f_log)
fetch_time_start_T_alter = fetch_time_end_T_alter
fetch_time_end_T_alter_n = fetch_time_end_T_ori
update_weekly_datapool(fetch_time_start_T_alter,
fetch_time_end_T_alter_n,
index_source=index_source,
index_dest=index_dest,
f_log=f_log)
# test
if __name__ == '__main__':
update_weekly_datapool(datetime.datetime(2018,6,18),
datetime.datetime(2018,6,18, 0,5,0),
# index_dest='test-v2',
week_day_start=1)
week_year1, week_no1, week_day1 = find_week_belongs_to(datetime.datetime(2018,6,18), 1)
print(define_doc_type(week_year1, week_no1, 1))