1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# -*- coding: utf-8 -*-
"""
Created on Sun May 27 23:28:45 2017
@author: hanye
"""
import datetime, time
from elasticsearch import Elasticsearch
import logging
import json
hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
task_index = 'task-stats'
task_index_doc_type = 'doc'
def get_task_description(path=None, filename=None):
if path==None:
path = '/home/hanye/project_data/Python/hyClass'
if path[-1]!='/':
path += '/'
if filename==None:
filename = 'task_description_dict'
try:
with open(path+filename, 'r', encoding='utf-8') as fr:
task_desc_dict = json.load(fr)
return task_desc_dict
except:
return None
def form_task_doc_id(task_name, day_str):
doc_id = '%s_%s' % (task_name, day_str)
return doc_id
def record_task_stats(task_name, program_file_name, task_freq,
start_time, end_time=None,
time_of_processed_data=None,
is_done=False, task_stats=None,
predecessor_task_id=None,
predecessor_task_name=None,
predecessor_task_done_time=None,
with_task_description=True,
task_desc=None,
):
"""
task_freq should be one of 'daily', 'monthly' or 'temp'.
task_start and task_end should be millisecond-unit timestamp for local time.
time_of_processed_data describes the target date for the processed data
(usually fetch_time, can be others), should be
a timestamp of int number in millisecond, if not passed,
will default to the time when running this program.
is_done should be True of False, describe if the task to be record has been
done.
task_stats default to None, is a string to describe task status
for human read, can be other string when is_done==False.
If this task must performs AFTER another task finished, the predecessor one
should be recorded by predecessor_task_id, predecessor_task_name and
predecessor_task_done_time, which are all default to None.
This function is supposed to be called twice at least for each tracked task:
First when the task starts, record the task_start, leave task_end=None,
and is_done=False. Second time, when the task ends, record task_end and shift
is_done to be True.
Ordinary tasks have been described in detail, these description will be added
into stats by default. When calling this method, the description can be turned
off explictly by passing with_task_description=False.
task_desc is the user-define task description, if given not None, will overwrite
defaulted description no matter what's passed in by with_task_description.
"""
if time_of_processed_data==None:
time_of_processed_data = int(datetime.datetime.now().timestamp()*1e3)
day_str = datetime.datetime.fromtimestamp(
time_of_processed_data/1e3).isoformat()[:10]
doc_id = form_task_doc_id(task_name, day_str)
ts_now = int(datetime.datetime.now().timestamp()*1e3)
data_d = {'task_name': task_name,
'program_file_name': program_file_name,
'task_freq': task_freq,
'start_time': start_time,
'time_of_processed_data': time_of_processed_data,
'is_done': is_done,
'timestamp': ts_now,
}
if end_time!=None:
data_d['end_time'] = end_time
data_d['task_exec_time'] = end_time-start_time
if task_stats!=None:
data_d['task_status'] = task_stats
if predecessor_task_id!=None:
data_d['predecessor_task_id'] = predecessor_task_id
if predecessor_task_name!=None:
data_d['predecessor_task_name'] = predecessor_task_name
if predecessor_task_done_time!=None:
data_d['predecessor_task_done_time'] = predecessor_task_done_time
if with_task_description==True:
task_desc_dict = get_task_description()
if task_name in task_desc_dict:
task_desc = task_desc_dict[task_name]
data_d['task_description'] = task_desc
if task_desc!=None:
data_d['task_description'] = task_desc
es.index(id=doc_id, index=task_index, doc_type=task_index_doc_type,
body=data_d,
request_timeout=100)
def get_task_record(task_id):
try:
get_resp = es.get(index=task_index, doc_type=task_index_doc_type,
id=task_id, request_timeout=100)
if get_resp['found']==True:
task_stats_data = get_resp['_source']
task_stats_data['_id'] = get_resp['_id']
return task_stats_data
else:
return None
except:
print('Failed to get stats data for task_id %s' % task_id)
return None
def is_task_done(task_id):
try:
task_doc = get_task_record(task_id)
except:
task_doc=None
if task_doc==None:
return False
else:
task_is_done = task_doc['is_done']
return task_is_done
def is_task_started(task_id):
task_doc = get_task_record(task_id)
if task_doc==None:
return False
else:
return True
def find_task_record(task_name, day_str_of_processed_data):
if '.py' in task_name:
# strip .py to get task_name, if however a program filename
# is given instead of task_name
task_name = task_name.split('.py')[0]
task_id = form_task_doc_id(task_name, day_str_of_processed_data)
return get_task_record(task_id)
def wait_for_pre_task_to_cal(pre_task_names, loggerName,
cal_func,
cal_func_params,
task_name, program_file_name,
time_of_processed_data_T,
cal_func_kwargs=None,
f_log=None,
task_freq='daily'):
"""
Logging method is defaulted to use python's logging utility,
but if f_log is given (that's f_log!=None, should be a
file object), log message will fall back to use print
with file=f_log. Whenever f_log is not None, whatever passed
into loggerName will be ignored.
cal_func_params is a tuple, cannot be omitted. If a one-element
tuple is going to be passed in, say arg1 is the only element,
should passing in like cal_func_params=(arg1,)
Notice the comma behind arg1 should NOT be omitted, or
will be mis-interpreted.
cal_func_kwargs is a dict, can be omitted.
"""
def log_func(log_msg, f_log):
if f_log==None and loggerName!=None:
logger=logging.getLogger('%s.wait_for_pre_task' % loggerName)
logger.info(log_msg)
elif f_log==None and loggerName==None:
print(log_msg, datetime.datetime.now())
else:
print(log_msg, datetime.datetime.now(), file=f_log)
pre_task_done = []
for tsknm in pre_task_names:
task_id = form_task_doc_id(tsknm,
pre_task_names[tsknm]['days_str_of_proc_data'])
pre_task_done.append(is_task_done(task_id))
pre_task_names[tsknm]['task_id'] = task_id
# sort to get consistent order in predecessor_task_id,
# predecessor_task_name and predecessor_task_done_time fields
predecessor_task_name_Lst = sorted(list(pre_task_names.keys()))
predecessor_task_id_Lst = []
for tnm in predecessor_task_name_Lst:
predecessor_task_id_Lst.append(pre_task_names[tnm]['task_id'])
wait_start_T = datetime.datetime.now()
wait_start_ts = int(wait_start_T.timestamp()*1e3)
while not all(pre_task_done):
log_func('Not all predecessor tasks done, wait...', f_log)
time.sleep(60)
pre_task_done.clear()
undone_tasks = []
for tsknm in pre_task_names:
task_id = pre_task_names[tsknm]['task_id']
is_done = is_task_done(task_id)
pre_task_done.append(is_done)
if not is_done:
undone_tasks.append(task_id)
wait_present_T = datetime.datetime.now()
undone_tasks_str = ','.join(undone_tasks)
# write task-stats when task is waiting
record_task_stats(
task_name=task_name,
program_file_name=program_file_name,
task_freq=task_freq,
start_time=wait_start_ts, # start_time store wait start time at waiting stage
time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3),
predecessor_task_id=predecessor_task_id_Lst,
predecessor_task_name = predecessor_task_name_Lst,
task_stats='Waiting for task %s to finish.' % undone_tasks_str
)
if (wait_present_T-wait_start_T).total_seconds()>24*3600:
log_func('Have waited for 24 hours and not all predecessor task '
'done, program exits.', f_log)
import sys
sys.exit(0)
log_func('All predecessor tasks done,', f_log)
for tsknm in pre_task_names:
task_id = pre_task_names[tsknm]['task_id']
task_stats_data = get_task_record(task_id)
done_at = task_stats_data['end_time']
time_of_processed_data = task_stats_data['time_of_processed_data']
time_of_processed_data_str = datetime.datetime.fromtimestamp(time_of_processed_data/1e3).isoformat()[:10]
pre_task_names[tsknm]['done_at'] = done_at
done_at_str = datetime.datetime.fromtimestamp(done_at/1e3).isoformat()
log_func('Task %s for %s has been done at %s' % (tsknm,
time_of_processed_data_str, done_at_str), f_log)
log_func('Will perform %s' % cal_func.__name__, f_log)
# write task status into task-stats index when starts
task_start_ts = int(datetime.datetime.now().timestamp()*1e3)
predecessor_task_done_time_Lst = []
for tnm in predecessor_task_name_Lst:
predecessor_task_done_time_Lst.append(pre_task_names[tnm]['done_at'])
record_task_stats(
task_name=task_name,
program_file_name=program_file_name,
task_freq=task_freq,
start_time=task_start_ts,
time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3),
predecessor_task_id=predecessor_task_id_Lst,
predecessor_task_name = predecessor_task_name_Lst,
predecessor_task_done_time=predecessor_task_done_time_Lst
)
log_func('Will execute function %s with parameter %s'
% (cal_func.__name__, cal_func_params), f_log)
if cal_func_kwargs is None:
cal_func(*cal_func_params)
else:
cal_func(*cal_func_params, **cal_func_kwargs)
# write task status into task-stats index when ends
task_end_ts = int(datetime.datetime.now().timestamp()*1e3)
record_task_stats(
task_name=task_name,
program_file_name=program_file_name,
task_freq=task_freq,
start_time=task_start_ts,
time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3),
predecessor_task_id=predecessor_task_id_Lst,
predecessor_task_name = predecessor_task_name_Lst,
predecessor_task_done_time=predecessor_task_done_time_Lst,
end_time=task_end_ts,
is_done=True,
task_stats='Done',
)
def update_task_record_fields(task_id, field_name_value_dict):
"""
GET doc, store in memeory, update values given, PUT into es.
Rather than using _update API, using PUT whole directly.
The consideration is that, _update API in elasticsearch
relies on painless script, which might involve heavily.
It's hard to be forward compatible in this case.
"""
if field_name_value_dict!={}:
task_dict = get_task_record(task_id)
task_dict.update(field_name_value_dict)
# body str should NOT contain _id field
task_dict.pop('_id', None)
data_str = json.dumps(task_dict)
update_resp = es.index(index=task_index, doc_type=task_index_doc_type,
id=task_id, body=data_str)
print('update_resp: %s' % update_resp)
else:
print('Empty field_name_value_dict!')