task_stats.py 12.1 KB
# -*- coding: utf-8 -*-
"""
Created on Sun May 27 23:28:45 2017

@author: hanye
"""

import datetime, time
from elasticsearch import Elasticsearch
import logging
import json

hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)

task_index = 'task-stats'
task_index_doc_type = 'doc'

def get_task_description(path=None, filename=None):
    if path==None:
        path = '/home/hanye/project_data/Python/hyClass'
    if path[-1]!='/':
        path += '/'
    if filename==None:
        filename = 'task_description_dict'
    try:
        with open(path+filename, 'r', encoding='utf-8') as fr:
            task_desc_dict = json.load(fr)
        return task_desc_dict
    except:
        return None


def form_task_doc_id(task_name, day_str):
    doc_id = '%s_%s' % (task_name, day_str)
    return doc_id

def record_task_stats(task_name, program_file_name, task_freq,
                      start_time, end_time=None,
                      time_of_processed_data=None,
                      is_done=False, task_stats=None,
                      predecessor_task_id=None,
                      predecessor_task_name=None,
                      predecessor_task_done_time=None,
                      with_task_description=True,
                      task_desc=None,
                     ):
    """
    task_freq should be one of 'daily', 'monthly' or 'temp'.
    task_start and task_end should be millisecond-unit timestamp for local time.
    time_of_processed_data describes the target date for the processed data
    (usually fetch_time, can be others), should be
    a timestamp of int number in millisecond, if not passed,
    will default to the time when running this program.
    is_done should be True of False, describe if the task to be record has been
    done.
    task_stats default to None, is a string to describe task status
    for human read, can be other string when is_done==False.
    If this task must performs AFTER another task finished, the predecessor one
    should be recorded by predecessor_task_id, predecessor_task_name and
    predecessor_task_done_time, which are all default to None.
    This function is supposed to be called twice at least for each tracked task:
    First when the task starts, record the task_start, leave task_end=None,
    and is_done=False. Second time, when the task ends, record task_end and shift
    is_done to be True.
    Ordinary tasks have been described in detail, these description will be added
    into stats by default. When calling this method, the description can be turned
    off explictly by passing with_task_description=False.
    task_desc is the user-define task description, if given not None, will overwrite
    defaulted description no matter what's passed in by with_task_description.
    """
    if time_of_processed_data==None:
        time_of_processed_data = int(datetime.datetime.now().timestamp()*1e3)
    day_str = datetime.datetime.fromtimestamp(
                  time_of_processed_data/1e3).isoformat()[:10]
    doc_id = form_task_doc_id(task_name, day_str)
    ts_now = int(datetime.datetime.now().timestamp()*1e3)
    data_d = {'task_name': task_name,
              'program_file_name': program_file_name,
              'task_freq': task_freq,
              'start_time': start_time,
              'time_of_processed_data': time_of_processed_data,
              'is_done': is_done,
              'timestamp': ts_now,
              }
    if end_time!=None:
        data_d['end_time'] = end_time
        data_d['task_exec_time'] = end_time-start_time
    if task_stats!=None:
        data_d['task_status'] = task_stats
    if predecessor_task_id!=None:
        data_d['predecessor_task_id'] = predecessor_task_id
    if predecessor_task_name!=None:
        data_d['predecessor_task_name'] = predecessor_task_name
    if predecessor_task_done_time!=None:
        data_d['predecessor_task_done_time'] = predecessor_task_done_time

    if with_task_description==True:
        task_desc_dict = get_task_description()
        if task_name in task_desc_dict:
            task_desc = task_desc_dict[task_name]
            data_d['task_description'] = task_desc
    if task_desc!=None:
        data_d['task_description'] = task_desc

    es.index(id=doc_id, index=task_index, doc_type=task_index_doc_type,
             body=data_d,
             request_timeout=100)

def get_task_record(task_id):
    try:
        get_resp = es.get(index=task_index, doc_type=task_index_doc_type,
                          id=task_id, request_timeout=100)
        if get_resp['found']==True:
            task_stats_data = get_resp['_source']
            task_stats_data['_id'] = get_resp['_id']
            return task_stats_data
        else:
            return None
    except:
        print('Failed to get stats data for task_id %s' % task_id)
        return None

def is_task_done(task_id):
    try:
        task_doc = get_task_record(task_id)
    except:
        task_doc=None
    if task_doc==None:
        return False
    else:
        task_is_done = task_doc['is_done']
        return task_is_done

def is_task_started(task_id):
    task_doc = get_task_record(task_id)
    if task_doc==None:
        return False
    else:
        return True

def find_task_record(task_name, day_str_of_processed_data):
    if '.py' in task_name:
        # strip .py to get task_name, if however a program filename
        # is given instead of task_name
        task_name = task_name.split('.py')[0]
    task_id = form_task_doc_id(task_name, day_str_of_processed_data)
    return get_task_record(task_id)


def wait_for_pre_task_to_cal(pre_task_names, loggerName,
                             cal_func,
                             cal_func_params,
                             task_name, program_file_name,
                             time_of_processed_data_T,
                             cal_func_kwargs=None,
                             f_log=None,
                             task_freq='daily'):
    """
    Logging method is defaulted to use python's logging utility,
    but if f_log is given (that's f_log!=None, should be a
    file object), log message will fall back to use print
    with file=f_log. Whenever f_log is not None, whatever passed
    into loggerName will be ignored.

    cal_func_params is a tuple, cannot be omitted. If a one-element
    tuple is going to be passed in, say arg1 is the only element,
    should passing in like cal_func_params=(arg1,)
    Notice the comma behind arg1 should NOT be omitted, or
    will be mis-interpreted.
    cal_func_kwargs is a dict, can be omitted.
    """
    def log_func(log_msg, f_log):
        if f_log==None and loggerName!=None:
            logger=logging.getLogger('%s.wait_for_pre_task' % loggerName)
            logger.info(log_msg)
        elif f_log==None and loggerName==None:
            print(log_msg, datetime.datetime.now())
        else:
            print(log_msg, datetime.datetime.now(), file=f_log)

    pre_task_done = []
    for tsknm in pre_task_names:
        task_id = form_task_doc_id(tsknm,
                        pre_task_names[tsknm]['days_str_of_proc_data'])
        pre_task_done.append(is_task_done(task_id))
        pre_task_names[tsknm]['task_id'] = task_id

    # sort to get consistent order in predecessor_task_id,
    # predecessor_task_name and predecessor_task_done_time fields
    predecessor_task_name_Lst = sorted(list(pre_task_names.keys()))

    predecessor_task_id_Lst = []
    for tnm in predecessor_task_name_Lst:
        predecessor_task_id_Lst.append(pre_task_names[tnm]['task_id'])
    wait_start_T = datetime.datetime.now()
    wait_start_ts = int(wait_start_T.timestamp()*1e3)
    while not all(pre_task_done):
        log_func('Not all predecessor tasks done, wait...', f_log)
        time.sleep(60)
        pre_task_done.clear()
        undone_tasks = []
        for tsknm in pre_task_names:
            task_id = pre_task_names[tsknm]['task_id']
            is_done = is_task_done(task_id)
            pre_task_done.append(is_done)
            if not is_done:
                undone_tasks.append(task_id)
        wait_present_T = datetime.datetime.now()
        undone_tasks_str = ','.join(undone_tasks)
        # write task-stats when task is waiting
        record_task_stats(
             task_name=task_name,
             program_file_name=program_file_name,
             task_freq=task_freq,
             start_time=wait_start_ts, # start_time store wait start time at waiting stage
             time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3),
             predecessor_task_id=predecessor_task_id_Lst,
             predecessor_task_name = predecessor_task_name_Lst,
             task_stats='Waiting for task %s to finish.' % undone_tasks_str
             )
        if (wait_present_T-wait_start_T).total_seconds()>24*3600:
            log_func('Have waited for 24 hours and not all predecessor task '
                         'done, program exits.', f_log)
            import sys
            sys.exit(0)

    log_func('All predecessor tasks done,', f_log)
    for tsknm in pre_task_names:
        task_id = pre_task_names[tsknm]['task_id']
        task_stats_data = get_task_record(task_id)
        done_at = task_stats_data['end_time']
        time_of_processed_data = task_stats_data['time_of_processed_data']
        time_of_processed_data_str = datetime.datetime.fromtimestamp(time_of_processed_data/1e3).isoformat()[:10]
        pre_task_names[tsknm]['done_at'] = done_at
        done_at_str = datetime.datetime.fromtimestamp(done_at/1e3).isoformat()
        log_func('Task %s for %s has been done at %s' % (tsknm,
                    time_of_processed_data_str, done_at_str), f_log)

    log_func('Will perform %s' % cal_func.__name__, f_log)

    # write task status into task-stats index when starts
    task_start_ts = int(datetime.datetime.now().timestamp()*1e3)
    predecessor_task_done_time_Lst = []
    for tnm in predecessor_task_name_Lst:
        predecessor_task_done_time_Lst.append(pre_task_names[tnm]['done_at'])
    record_task_stats(
         task_name=task_name,
         program_file_name=program_file_name,
         task_freq=task_freq,
         start_time=task_start_ts,
         time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3),
         predecessor_task_id=predecessor_task_id_Lst,
         predecessor_task_name = predecessor_task_name_Lst,
         predecessor_task_done_time=predecessor_task_done_time_Lst
         )

    log_func('Will execute function %s with parameter %s'
                % (cal_func.__name__, cal_func_params), f_log)
    if cal_func_kwargs is None:
        cal_func(*cal_func_params)
    else:
        cal_func(*cal_func_params, **cal_func_kwargs)
    # write task status into task-stats index when ends
    task_end_ts = int(datetime.datetime.now().timestamp()*1e3)
    record_task_stats(
         task_name=task_name,
         program_file_name=program_file_name,
         task_freq=task_freq,
         start_time=task_start_ts,
         time_of_processed_data=int(time_of_processed_data_T.timestamp()*1e3),
         predecessor_task_id=predecessor_task_id_Lst,
         predecessor_task_name = predecessor_task_name_Lst,
         predecessor_task_done_time=predecessor_task_done_time_Lst,
         end_time=task_end_ts,
         is_done=True,
         task_stats='Done',
         )

def update_task_record_fields(task_id, field_name_value_dict):
    """
    GET doc, store in memeory, update values given, PUT into es.
    Rather than using _update API, using PUT whole directly.
    The consideration is that, _update API in elasticsearch
    relies on painless script, which might involve heavily.
    It's hard to be forward compatible in this case.
    """
    if field_name_value_dict!={}:
        task_dict = get_task_record(task_id)
        task_dict.update(field_name_value_dict)
        # body str should NOT contain _id field
        task_dict.pop('_id', None)
        data_str = json.dumps(task_dict)
        update_resp = es.index(index=task_index, doc_type=task_index_doc_type,
                               id=task_id, body=data_str)
        print('update_resp: %s' % update_resp)
    else:
        print('Empty field_name_value_dict!')