# -*- coding:UTF-8 -*- # @Time : 2020/12/1 10:46 # @File : es_status.py # @email : litao@igengmei.com # @author : litao # es状态监测 # from elasticsearch_7 import Elasticsearch # es = Elasticsearch([ # { # 'host': '172.16.31.6', # 'port': 9200, # }, { # 'host': '172.16.31.16', # 'port': 9200, # }, { # 'host': '172.16.31.10', # 'port': 9200, # } # ]) # para = "_stats" # es.get(index="gm-dbmw-associate_tag",) import time import requests import datetime from meta_base_code.send_msg_to_dingding.send_msg import send_msg_to_dingtalk data_dict = {} memory_used_rate_top = 0.8 fs_free_rate_top = 0.8 cpu_top = 75 search_queue_max = 1 secret = 'SECbbfd6b7403869cf8a31e63e2d623378bd8a55b5a31083fad6421ee817794f485' access_token = '6eb687358606347cef617237cddab6a80e2f5981b46fe04950a96152e387f35c' def get_cluster_stats(ip): if "@" in ip: ip, auth = ip.split("@", 1) node_requests_url = "http://{ip}:9200/_cluster/stats".format(ip=ip) node_requests_res = requests.get(node_requests_url, verify=False, auth=tuple(auth.split(':', 1))) else: node_requests_url = "http://{ip}:9200/_cluster/stats".format(ip=ip) node_requests_res = requests.get(node_requests_url) res_json = node_requests_res.json() # print(res_json) stats = res_json.get("status") timestamp = int(datetime.datetime.now().timestamp()) memory_used = res_json["nodes"]['jvm']["mem"].get("heap_used_in_bytes") memory_total = res_json["nodes"]['jvm']["mem"].get("heap_max_in_bytes") memory_used_rate = round(memory_used / memory_total, 4) fs_available = res_json["nodes"]["fs"].get("available_in_bytes") fs_total = res_json["nodes"]["fs"].get("total_in_bytes") fs_free_rate = round(1 - fs_available / fs_total, 4) return ip, stats, timestamp, memory_used_rate, fs_free_rate def get_index_stats(ip, index): index_requests_url = "http://{ip}:9200/{index}/stats".format(ip=ip, index=index) node_requests_res = requests.get(index_requests_url) res_json = node_requests_res.json() def get_node_status(ip): if "@" in ip: ip, auth = ip.split("@", 1) node_requests_url = "http://{ip}:9200/_nodes/stats".format(ip=ip) node_requests_res = requests.get(node_requests_url, verify=False, auth=tuple(auth.split(':', 1))) else: node_requests_url = "http://{ip}:9200/_nodes/stats".format(ip=ip) node_requests_res = requests.get(node_requests_url) res_json = node_requests_res.json() # print(res_json) for data_name in res_json["nodes"]: data = res_json['nodes'][data_name] node_name = data['name'] ip = data['ip'] query_time = data["indices"]["search"]["query_time_in_millis"] query_current = data["indices"]["search"]["query_current"] fetch_current = data["indices"]["search"]["fetch_current"] cpu_percent = data["os"]["cpu"]["percent"] cpu_1m = data["process"]["cpu"]["percent"] # cpu_1m = data["os"]["cpu"]["load_average"]['1m'] # cpu_5m = data["os"]["cpu"]["load_average"]['5m'] # cpu_15m = data["os"]["cpu"]["load_average"]['15m'] young_gc = data["jvm"]["gc"]["collectors"]['young']['collection_count'] young_gc_ms = data["jvm"]["gc"]["collectors"]['young']['collection_time_in_millis'] old_gc = data["jvm"]["gc"]["collectors"]['young']['collection_count'] old_gc_ms = data["jvm"]["gc"]["collectors"]['young']['collection_time_in_millis'] search_thread_pool = data["thread_pool"]["search"] young_gc_per_ms = int(young_gc_ms / young_gc) old_gc_ms = int(old_gc_ms / old_gc) yield node_name, ip, query_time, query_current, fetch_current, cpu_percent, cpu_1m, young_gc_per_ms, old_gc, old_gc_ms, search_thread_pool def parse_cluster_stats(data): res_str = "" ip, stats, timestamp, memory_used_rate, fs_free_rate = data if memory_used_rate >= memory_used_rate_top: res_str += "%s集群内存使用率超过%s 目前为%s\n" % ( ip, str(round(memory_used_rate_top * 100, 2)) + '%', str(round(memory_used_rate * 100, 2)) + '%') if fs_free_rate >= fs_free_rate_top: res_str += "%s集群硬盘占用超过%s 目前为%s\n" % ( ip, str(round(fs_free_rate_top * 100, 2)) + '%', str(round(fs_free_rate * 100, 2)) + '%') if res_str: res_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " " + res_str return res_str def parse_node_stats(data): res_str = "" node_name, ip, query_time, query_current, fetch_current, cpu_percent, cpu_1m, young_gc, \ old_gc, old_gc_ms, search_thread_pool = data if not data_dict.get(node_name): data_dict[node_name] = {} old_gc_last = data_dict[node_name].get('old_gc', old_gc) old_gc_ms_last = data_dict[node_name].get('old_gc_ms', old_gc_ms) # queue_last = data_dict[node_name].get('queue',0) rejected_last = data_dict[node_name].get('rejected', 0) if search_thread_pool['queue'] > search_queue_max: res_str += '{}节点 search queue 目前为{} \n'.format(node_name, str(search_thread_pool['queue'])) if search_thread_pool['rejected'] > rejected_last: res_str += '{}节点 search rejected 目前为 {} \n'.format(node_name, str(search_thread_pool['rejected'])) if old_gc - old_gc_last > 0: res_str += "{name}节点 old_gc增长中 old_gc为{old_gc} 平均{old_gc_ms}ms\n".format(name=node_name, old_gc=str(old_gc), old_gc_ms=str(old_gc_ms)) if cpu_1m >= cpu_top: res_str += "{name}节点CPU使用率预警 1m {cpu_1m} \n" \ "目前 query_current为{query_current} fetch_current为{fetch_current} young_gc为{young_gc}ms".format( cpu_1m=str(int(cpu_1m)) + "%", name=node_name, query_current=str(query_current), fetch_current=str(fetch_current), young_gc=str(young_gc)) data_dict[node_name]['old_gc'] = old_gc data_dict[node_name]['old_gc_ms_last'] = old_gc_ms_last data_dict[node_name].update(search_thread_pool) if res_str: res_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " " + res_str return res_str def main_task(ip_list): while True: for ip in ip_list: try: data = get_cluster_stats(ip) res = parse_cluster_stats(data) if res: send_msg_to_dingtalk(ip + res, access_token=access_token, secret=secret) for data in get_node_status(ip): parse_node_stats(data) res = parse_node_stats(data) if res: send_msg_to_dingtalk(res, access_token=access_token, secret=secret) except Exception as e: print(ip, e) time.sleep(10) if __name__ == "__main__": # main_task('172.16.31.6') ip_list = ['172.16.52.29', '172.16.52.25@elastic:gengmei!@#', '172.16.52.33@elastic:gengmei!@#'] main_task(ip_list)