# -*- coding: utf-8 -*- """ Created on Fri Sep 7 15:27:59 2018 @author: zhouyujiang """ import datetime import json from elasticsearch import Elasticsearch hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) #import logging #import argparse #from func_calculate_monthly_net_inc import cal_monthly_net_inc #from func_monthly_aggregated_daily_url_for_one_fetch_day import define_monthly_data_slice_type #from func_data_correction_for_nag_MNI import data_correction #import task_stats #runday = datetime.datetime.now() #if runday.day == 1: # last_day_in_the_month_T = runday - datetime.timedelta(days=1) #else: # if runday.month == 1: # month_pre = 12 # year_pre = runday.year - 1 # last_day_in_the_month_T = datetime.datetime(year_pre, month_pre, 31) # else: # month_pre = runday.month - 1 # year_pre = runday.year # last_day_in_the_month_T = (datetime.datetime(year_pre, runday.month, 1) # - datetime.timedelta(days=1)) #last_day_in_the_month_str = last_day_in_the_month_T.isoformat()[:10] #doc_type_monthly = 'daily-url-%s' % last_day_in_the_month_str #first_day_in_next_month_T = last_day_in_the_month_T + datetime.timedelta(days=1) #year_start = last_day_in_the_month_T.year #month_start = last_day_in_the_month_T.month #cal_month_str = datetime.datetime.strftime(last_day_in_the_month_T, '%b%Y') #cal_month_T = last_day_in_the_month_T #task_start_ts = 1536203293188 #task_end_ts = int(datetime.datetime.now().timestamp()*1e3) #task_end_ts = int(datetime.datetime.now().timestamp()*1e3) #task_stats.record_task_stats( # task_name='write_qingbo_TK_ATU_data_from_ftp_daily_task_2019-01-19', # program_file_name='calculate_monthly_net_inc_for_missed_MNI_monthly_task.py', # task_freq='monthly', # start_time=task_start_ts, # time_of_processed_data=int(cal_month_T.timestamp()*1e3), # end_time=task_end_ts, # is_done=True, # task_stats='Done', # ) re_list = [] task_id_list = ['calculate_weekly_net_inc_by_redis_weekly_task_2019-06-10', # 'monthly_aggs_daily_url_daily_task_call_func_2019-02-13', # 'monthly_aggs_daily_url_daily_task_call_func_2019-02-14', # 'monthly_aggs_daily_url_daily_task_call_func_2019-02-15', # 'monthly_aggs_daily_url_daily_task_call_func_2019-02-16', ] for task_id in task_id_list: search_body = { "query": { "bool": { "filter": [ {"term": {"_id": task_id}} ] } } } search_re = es.search(index='task-stats', doc_type='doc', body=search_body) if search_re['hits']['total'] > 0: line = search_re['hits']['hits'][0]['_source'] line['is_done'] = True line['end_time'] = int(datetime.datetime.timestamp(datetime.datetime.now())*1000) line['task_stats'] = 'Done' doc_id = task_id bulk_head = '{"index": {"_id":"%s"}}' % doc_id data_str = json.dumps(line, ensure_ascii=False) bulk_one_body = bulk_head + '\n' + data_str + '\n' eror_dic=es.bulk(index='task-stats', doc_type='doc', body=bulk_one_body, request_timeout=200) bulk_all_body='' if eror_dic['errors'] is True: print(eror_dic['items']) print(bulk_all_body)