modif_true_task_state.py 3.5 KB
# -*- coding: utf-8 -*-
"""
Created on Fri Sep  7 15:27:59 2018

@author: zhouyujiang
"""

import datetime
import json
from elasticsearch import Elasticsearch
hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)

es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
#import logging
#import argparse
#from func_calculate_monthly_net_inc import cal_monthly_net_inc
#from func_monthly_aggregated_daily_url_for_one_fetch_day import define_monthly_data_slice_type
#from func_data_correction_for_nag_MNI import data_correction
#import task_stats

#runday = datetime.datetime.now()
#if runday.day == 1:
#    last_day_in_the_month_T = runday - datetime.timedelta(days=1)
#else:
#    if runday.month == 1:
#        month_pre = 12
#        year_pre = runday.year - 1
#        last_day_in_the_month_T = datetime.datetime(year_pre, month_pre, 31)
#    else:
#        month_pre = runday.month - 1
#        year_pre = runday.year
#        last_day_in_the_month_T = (datetime.datetime(year_pre, runday.month, 1)
#                                   - datetime.timedelta(days=1))
#last_day_in_the_month_str = last_day_in_the_month_T.isoformat()[:10]
#doc_type_monthly = 'daily-url-%s' % last_day_in_the_month_str
#first_day_in_next_month_T = last_day_in_the_month_T + datetime.timedelta(days=1)
#year_start = last_day_in_the_month_T.year
#month_start = last_day_in_the_month_T.month
#cal_month_str = datetime.datetime.strftime(last_day_in_the_month_T, '%b%Y')
#cal_month_T = last_day_in_the_month_T
#task_start_ts = 1536203293188
#task_end_ts = int(datetime.datetime.now().timestamp()*1e3)
#task_end_ts = int(datetime.datetime.now().timestamp()*1e3)
#task_stats.record_task_stats(
#    task_name='write_qingbo_TK_ATU_data_from_ftp_daily_task_2019-01-19',
#    program_file_name='calculate_monthly_net_inc_for_missed_MNI_monthly_task.py',
#    task_freq='monthly',
#    start_time=task_start_ts,
#    time_of_processed_data=int(cal_month_T.timestamp()*1e3),
#    end_time=task_end_ts,
#    is_done=True,
#    task_stats='Done',
#    )
re_list = []
task_id_list = ['calculate_weekly_net_inc_by_redis_weekly_task_2019-06-10',
#                'monthly_aggs_daily_url_daily_task_call_func_2019-02-13',
#                'monthly_aggs_daily_url_daily_task_call_func_2019-02-14',
#                'monthly_aggs_daily_url_daily_task_call_func_2019-02-15',
#                'monthly_aggs_daily_url_daily_task_call_func_2019-02-16',
             
            
               ]
for task_id in task_id_list:
    search_body = {
                  "query": {
                    "bool": {
                      "filter": [
                        {"term": {"_id": task_id}}
                        ]
                    }
                  }
                }
    search_re = es.search(index='task-stats', doc_type='doc', body=search_body)
    if search_re['hits']['total'] > 0:
        line = search_re['hits']['hits'][0]['_source']
        line['is_done'] = True
        line['end_time'] = int(datetime.datetime.timestamp(datetime.datetime.now())*1000)
        line['task_stats'] = 'Done'
        doc_id = task_id
        bulk_head = '{"index": {"_id":"%s"}}' % doc_id
        data_str = json.dumps(line, ensure_ascii=False)

        bulk_one_body = bulk_head + '\n' + data_str + '\n'

        eror_dic=es.bulk(index='task-stats', doc_type='doc',
               body=bulk_one_body, request_timeout=200)
        bulk_all_body=''
        if eror_dic['errors'] is True:
            print(eror_dic['items'])
            print(bulk_all_body)