1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 7 15:27:59 2018
@author: zhouyujiang
"""
import datetime
import json
from elasticsearch import Elasticsearch
hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
#import logging
#import argparse
#from func_calculate_monthly_net_inc import cal_monthly_net_inc
#from func_monthly_aggregated_daily_url_for_one_fetch_day import define_monthly_data_slice_type
#from func_data_correction_for_nag_MNI import data_correction
#import task_stats
#runday = datetime.datetime.now()
#if runday.day == 1:
# last_day_in_the_month_T = runday - datetime.timedelta(days=1)
#else:
# if runday.month == 1:
# month_pre = 12
# year_pre = runday.year - 1
# last_day_in_the_month_T = datetime.datetime(year_pre, month_pre, 31)
# else:
# month_pre = runday.month - 1
# year_pre = runday.year
# last_day_in_the_month_T = (datetime.datetime(year_pre, runday.month, 1)
# - datetime.timedelta(days=1))
#last_day_in_the_month_str = last_day_in_the_month_T.isoformat()[:10]
#doc_type_monthly = 'daily-url-%s' % last_day_in_the_month_str
#first_day_in_next_month_T = last_day_in_the_month_T + datetime.timedelta(days=1)
#year_start = last_day_in_the_month_T.year
#month_start = last_day_in_the_month_T.month
#cal_month_str = datetime.datetime.strftime(last_day_in_the_month_T, '%b%Y')
#cal_month_T = last_day_in_the_month_T
#task_start_ts = 1536203293188
#task_end_ts = int(datetime.datetime.now().timestamp()*1e3)
#task_end_ts = int(datetime.datetime.now().timestamp()*1e3)
#task_stats.record_task_stats(
# task_name='write_qingbo_TK_ATU_data_from_ftp_daily_task_2019-01-19',
# program_file_name='calculate_monthly_net_inc_for_missed_MNI_monthly_task.py',
# task_freq='monthly',
# start_time=task_start_ts,
# time_of_processed_data=int(cal_month_T.timestamp()*1e3),
# end_time=task_end_ts,
# is_done=True,
# task_stats='Done',
# )
re_list = []
task_id_list = ['calculate_weekly_net_inc_by_redis_weekly_task_2019-06-10',
# 'monthly_aggs_daily_url_daily_task_call_func_2019-02-13',
# 'monthly_aggs_daily_url_daily_task_call_func_2019-02-14',
# 'monthly_aggs_daily_url_daily_task_call_func_2019-02-15',
# 'monthly_aggs_daily_url_daily_task_call_func_2019-02-16',
]
for task_id in task_id_list:
search_body = {
"query": {
"bool": {
"filter": [
{"term": {"_id": task_id}}
]
}
}
}
search_re = es.search(index='task-stats', doc_type='doc', body=search_body)
if search_re['hits']['total'] > 0:
line = search_re['hits']['hits'][0]['_source']
line['is_done'] = True
line['end_time'] = int(datetime.datetime.timestamp(datetime.datetime.now())*1000)
line['task_stats'] = 'Done'
doc_id = task_id
bulk_head = '{"index": {"_id":"%s"}}' % doc_id
data_str = json.dumps(line, ensure_ascii=False)
bulk_one_body = bulk_head + '\n' + data_str + '\n'
eror_dic=es.bulk(index='task-stats', doc_type='doc',
body=bulk_one_body, request_timeout=200)
bulk_all_body=''
if eror_dic['errors'] is True:
print(eror_dic['items'])
print(bulk_all_body)