# -*- coding: utf-8 -*- """ Created on Sun May 27 23:28:45 2017 @author: hanye """ import datetime, time from elasticsearch import Elasticsearch import logging import json hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) task_index = 'task-stats' task_index_doc_type = 'doc' def get_task_description(path=None, filename=None): if path==None: path = '/home/hanye/project_data/Python/hyClass' if path[-1]!='/': path += '/' if filename==None: filename = 'task_description_dict' try: with open(path+filename, 'r', encoding='utf-8') as fr: task_desc_dict = json.load(fr) return task_desc_dict except: return None def form_task_doc_id(task_name, day_str): doc_id = '%s_%s' % (task_name, day_str) return doc_id def record_task_stats(task_name, program_file_name, task_freq, start_time, end_time=None, time_of_processed_data=None, is_done=False, task_stats=None, predecessor_task_id=None, predecessor_task_name=None, predecessor_task_done_time=None, with_task_description=True, task_desc=None, ): """ task_freq should be one of 'daily', 'monthly' or 'temp'. task_start and task_end should be millisecond-unit timestamp for local time. time_of_processed_data describes the target date for the processed data (usually fetch_time, can be others), should be a timestamp of int number in millisecond, if not passed, will default to the time when running this program. is_done should be True of False, describe if the task to be record has been done. task_stats default to None, is a string to describe task status for human read, can be other string when is_done==False. If this task must performs AFTER another task finished, the predecessor one should be recorded by predecessor_task_id, predecessor_task_name and predecessor_task_done_time, which are all default to None. This function is supposed to be called twice at least for each tracked task: First when the task starts, record the task_start, leave task_end=None, and is_done=False. Second time, when the task ends, record task_end and shift is_done to be True. Ordinary tasks have been described in detail, these description will be added into stats by default. When calling this method, the description can be turned off explictly by passing with_task_description=False. task_desc is the user-define task description, if given not None, will overwrite defaulted description no matter what's passed in by with_task_description. """ if time_of_processed_data==None: time_of_processed_data = int(datetime.datetime.now().timestamp()*1e3) day_str = datetime.datetime.fromtimestamp( time_of_processed_data/1e3).isoformat()[:10] doc_id = form_task_doc_id(task_name, day_str) ts_now = int(datetime.datetime.now().timestamp()*1e3) data_d = {'task_name': task_name, 'program_file_name': program_file_name, 'task_freq': task_freq, 'start_time': start_time, 'time_of_processed_data': time_of_processed_data, 'is_done': is_done, 'timestamp': ts_now, } if end_time!=None: data_d['end_time'] = end_time data_d['task_exec_time'] = end_time-start_time if task_stats!=None: data_d['task_status'] = task_stats if predecessor_task_id!=None: data_d['predecessor_task_id'] = predecessor_task_id if predecessor_task_name!=None: data_d['predecessor_task_name'] = predecessor_task_name if predecessor_task_done_time!=None: data_d['predecessor_task_done_time'] = predecessor_task_done_time if with_task_description==True: task_desc_dict = get_task_description() if task_name in task_desc_dict: task_desc = task_desc_dict[task_name] data_d['task_description'] = task_desc if task_desc!=None: data_d['task_description'] = task_desc es.index(id=doc_id, index=task_index, doc_type=task_index_doc_type, body=data_d, request_timeout=100) def get_task_record(task_id): try: get_resp = es.get(index=task_index, doc_type=task_index_doc_type, id=task_id, request_timeout=100) if get_resp['found']==True: task_stats_data = get_resp['_source'] task_stats_data['_id'] = get_resp['_id'] return task_stats_data else: return None except: print('Failed to get stats data for task_id %s' % task_id) return None def is_task_done(task_id): try: task_doc = get_task_record(task_id) except: task_doc=None if task_doc==None: return False else: task_is_done = task_doc['is_done'] return task_is_done def is_task_started(task_id): task_doc = get_task_record(task_id) if task_doc==None: return False else: return True def find_task_record(task_name, day_str_of_processed_data): if '.py' in task_name: # strip .py to get task_name, if however a program filename # is given instead of task_name task_name = task_name.split('.py')[0] task_id = form_task_doc_id(task_name, day_str_of_processed_data) return get_task_record(task_id) def wait_for_pre_task_to_cal(pre_task_names, loggerName, cal_func, cal_func_params, task_name, program_file_name, time_of_processed_data_T, cal_func_kwargs=None, f_log=None, task_freq='daily'): """ Logging method is defaulted to use python's logging utility, but if f_log is given (that's f_log!=None, should be a file object), log message will fall back to use print with file=f_log. Whenever f_log is not None, whatever passed into loggerName will be ignored. cal_func_params is a tuple, cannot be omitted. If a one-element tuple is going to be passed in, say arg1 is the only element, should passing in like cal_func_params=(arg1,) Notice the comma behind arg1 should NOT be omitted, or will be mis-interpreted. cal_func_kwargs is a dict, can be omitted. """ def log_func(log_msg, f_log): if f_log==None and loggerName!=None: logger=logging.getLogger('%s.wait_for_pre_task' % loggerName) logger.info(log_msg) elif f_log==None and loggerName==None: print(log_msg, datetime.datetime.now()) else: print(log_msg, datetime.datetime.now(), file=f_log) pre_task_done = [] for tsknm in pre_task_names: task_id = form_task_doc_id(tsknm, pre_task_names[tsknm]['days_str_of_proc_data']) pre_task_done.append(is_task_done(task_id)) pre_task_names[tsknm]['task_id'] = task_id # sort to get consistent order in predecessor_task_id, # predecessor_task_name and predecessor_task_done_time fields predecessor_task_name_Lst = sorted(list(pre_task_names.keys())) predecessor_task_id_Lst = [] for tnm in predecessor_task_name_Lst: predecessor_task_id_Lst.append(pre_task_names[tnm]['task_id']) wait_start_T = datetime.datetime.now() wait_start_ts = int(wait_start_T.timestamp()*1e3) while not all(pre_task_done): log_func('Not all predecessor tasks done, wait...', f_log) time.sleep(60) pre_task_done.clear() undone_tasks = [] for tsknm in pre_task_names: task_id = pre_task_names[tsknm]['task_id'] is_done = is_task_done(task_id) pre_task_done.append(is_done) if not is_done: undone_tasks.append(task_id) wait_present_T = datetime.datetime.now() undone_tasks_str = ','.join(undone_tasks) # write task-stats when task is waiting record_task_stats( task_name=task_name, program_file_name=program_file_name, task_freq=task_freq, start_time=wait_start_ts, # start_time store wait start time at waiting stage time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3), predecessor_task_id=predecessor_task_id_Lst, predecessor_task_name = predecessor_task_name_Lst, task_stats='Waiting for task %s to finish.' % undone_tasks_str ) if (wait_present_T-wait_start_T).total_seconds()>24*3600: log_func('Have waited for 24 hours and not all predecessor task ' 'done, program exits.', f_log) import sys sys.exit(0) log_func('All predecessor tasks done,', f_log) for tsknm in pre_task_names: task_id = pre_task_names[tsknm]['task_id'] task_stats_data = get_task_record(task_id) done_at = task_stats_data['end_time'] time_of_processed_data = task_stats_data['time_of_processed_data'] time_of_processed_data_str = datetime.datetime.fromtimestamp(time_of_processed_data/1e3).isoformat()[:10] pre_task_names[tsknm]['done_at'] = done_at done_at_str = datetime.datetime.fromtimestamp(done_at/1e3).isoformat() log_func('Task %s for %s has been done at %s' % (tsknm, time_of_processed_data_str, done_at_str), f_log) log_func('Will perform %s' % cal_func.__name__, f_log) # write task status into task-stats index when starts task_start_ts = int(datetime.datetime.now().timestamp()*1e3) predecessor_task_done_time_Lst = [] for tnm in predecessor_task_name_Lst: predecessor_task_done_time_Lst.append(pre_task_names[tnm]['done_at']) record_task_stats( task_name=task_name, program_file_name=program_file_name, task_freq=task_freq, start_time=task_start_ts, time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3), predecessor_task_id=predecessor_task_id_Lst, predecessor_task_name = predecessor_task_name_Lst, predecessor_task_done_time=predecessor_task_done_time_Lst ) log_func('Will execute function %s with parameter %s' % (cal_func.__name__, cal_func_params), f_log) if cal_func_kwargs is None: cal_func(*cal_func_params) else: cal_func(*cal_func_params, **cal_func_kwargs) # write task status into task-stats index when ends task_end_ts = int(datetime.datetime.now().timestamp()*1e3) record_task_stats( task_name=task_name, program_file_name=program_file_name, task_freq=task_freq, start_time=task_start_ts, time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3), predecessor_task_id=predecessor_task_id_Lst, predecessor_task_name = predecessor_task_name_Lst, predecessor_task_done_time=predecessor_task_done_time_Lst, end_time=task_end_ts, is_done=True, task_stats='Done', ) def update_task_record_fields(task_id, field_name_value_dict): """ GET doc, store in memeory, update values given, PUT into es. Rather than using _update API, using PUT whole directly. The consideration is that, _update API in elasticsearch relies on painless script, which might involve heavily. It's hard to be forward compatible in this case. """ if field_name_value_dict!={}: task_dict = get_task_record(task_id) task_dict.update(field_name_value_dict) # body str should NOT contain _id field task_dict.pop('_id', None) data_str = json.dumps(task_dict) update_resp = es.index(index=task_index, doc_type=task_index_doc_type, id=task_id, body=data_str) print('update_resp: %s' % update_resp) else: print('Empty field_name_value_dict!')