#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import logging import traceback import os.path import re import json from elasticsearch import Elasticsearch import elasticsearch.helpers from django.conf import settings ES_INFO_LIST = [ { "host": "172.17.32.22", "port": 9200 } ] ES_INDEX_PREFIX = "gm-dbmw" class ESPerform(object): cli_obj = None cli_info_list = ES_INFO_LIST index_prefix = ES_INDEX_PREFIX @classmethod def get_cli(cls, cli_info=None): try: init_args = { 'sniff_on_start': False, 'sniff_on_connection_fail': False, } es_cli_info = cli_info if cli_info else cls.cli_info_list cls.cli_obj = Elasticsearch(hosts=es_cli_info, **init_args) return cls.cli_obj except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return None @classmethod def get_official_index_name(cls, sub_index_name, index_flag=None): """ :remark:get official es index name :param sub_index_name: :param index_flag: :return: """ try: assert (index_flag in [None, "read", "write"]) official_index_name = cls.index_prefix + "-" + sub_index_name if index_flag: official_index_name += "-" + index_flag return official_index_name except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return None @classmethod def __load_mapping(cls, doc_type): try: mapping_file_path = os.path.join( os.path.dirname(__file__), '..', 'trans2es', 'mapping', '%s.json' % (doc_type,)) mapping = '' with open(mapping_file_path, 'r') as f: for line in f: # 去掉注释 mapping += re.sub(r'//.*$', '', line) mapping = json.loads(mapping) return mapping except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return None @classmethod def create_index(cls, es_cli, sub_index_name): """ :remark: create es index,alias index :param sub_index_name: :return: """ try: assert (es_cli is not None) official_index_name = cls.get_official_index_name(sub_index_name) index_exist = es_cli.indices.exists(official_index_name) if not index_exist: es_cli.indices.create(official_index_name) read_alias_name = cls.get_official_index_name(sub_index_name, "read") es_cli.indices.put_alias(official_index_name, read_alias_name) write_alias_name = cls.get_official_index_name(sub_index_name, "write") es_cli.indices.put_alias(official_index_name, write_alias_name) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def put_index_mapping(cls, es_cli, sub_index_name, mapping_type="_doc", force_sync=False): """ :remark: put index mapping :param es_cli: :param sub_index_name: :param mapping_type: :return: """ try: assert (es_cli is not None) write_alias_name = cls.get_official_index_name(sub_index_name, "write") index_exist = es_cli.indices.exists(write_alias_name) if not index_exist and not force_sync: return False mapping_dict = cls.__load_mapping(sub_index_name) es_cli.indices.put_mapping(index=write_alias_name, body=mapping_dict, doc_type=mapping_type) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def put_indices_template(cls, es_cli, template_file_name, template_name): """ :remark put index template :param es_cli: :param template_file_name: :param template_name: :return: """ try: assert (es_cli is not None) mapping_dict = cls.__load_mapping(template_file_name) es_cli.indices.put_template(name=template_name, body=mapping_dict) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def es_helpers_bulk(cls, es_cli, data_list, sub_index_name, auto_create_index=False, doc_type="_doc"): try: assert (es_cli is not None) official_index_name = cls.get_official_index_name(sub_index_name, "write") index_exists = es_cli.indices.exists(official_index_name) if not index_exists: if not auto_create_index: logging.error("index:%s is not existing,bulk data error!" % official_index_name) return False else: cls.create_index(es_cli, sub_index_name) cls.put_index_mapping(es_cli, sub_index_name) bulk_actions = [] if sub_index_name == "topic" or \ sub_index_name == "topic-star-routing" or \ sub_index_name == "topic-high-star": for data in data_list: if data: bulk_actions.append({ '_op_type': 'index', '_index': official_index_name, '_type': doc_type, '_id': data['id'], '_source': data, 'routing': data["content_level"] }) else: for data in data_list: if data: bulk_actions.append({ '_op_type': 'index', '_index': official_index_name, '_type': doc_type, '_id': data['id'], '_source': data, }) elasticsearch.helpers.bulk(es_cli, bulk_actions) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def get_search_results(cls, es_cli, sub_index_name, query_body, offset=0, size=10, auto_create_index=False, doc_type="_doc", aggregations_query=False, is_suggest_request=False, batch_search=False, routing=None): try: assert (es_cli is not None) official_index_name = cls.get_official_index_name(sub_index_name, "read") index_exists = es_cli.indices.exists(official_index_name) if not index_exists: if not auto_create_index: logging.error("index:%s is not existing,get_search_results error!" % official_index_name) return None else: cls.create_index(es_cli, sub_index_name) cls.put_index_mapping(es_cli, sub_index_name) logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8")) if not batch_search: if not routing: res = es_cli.search(index=official_index_name, doc_type=doc_type, body=query_body, from_=offset, size=size) else: res = es_cli.search(index=official_index_name, doc_type=doc_type, body=query_body, from_=offset, size=size, routing=routing) if is_suggest_request: return res else: result_dict = { "total_count": res["hits"]["total"], "hits": res["hits"]["hits"] } if aggregations_query: result_dict["aggregations"] = res["aggregations"] return result_dict else: res = es_cli.msearch(body=query_body, index=official_index_name, doc_type=doc_type) logging.info("duan add,msearch res:%s" % str(res)) return res except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return {"total_count": 0, "hits": []} @classmethod def get_analyze_results(cls, es_cli, sub_index_name, query_body): try: assert (es_cli is not None) official_index_name = cls.get_official_index_name(sub_index_name, "read") index_exists = es_cli.indices.exists(official_index_name) if not index_exists: logging.error("index:%s is not existing,get_search_results error!" % official_index_name) return None res = es_cli.indices.analyze(index=official_index_name, body=query_body) return res except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return None @classmethod def if_es_node_load_high(cls, es_cli): try: assert (es_cli is not None) high_num = 0 es_nodes_list = list() es_nodes_ori_info = es_cli.cat.nodes() es_nodes_info_list = es_nodes_ori_info.split("\n") for item in es_nodes_info_list: try: item_list = item.split(" ") if len(item_list) == 11: cpu_load = item_list[4] elif len(item_list) == 10: cpu_load = item_list[3] else: continue int_cpu_load = int(cpu_load) if int_cpu_load > 60: high_num += 1 es_nodes_list.append(int_cpu_load) except: logging.error("catch exception,item:%s,err_msg:%s" % (str(item), traceback.format_exc())) return True if high_num > 3: logging.info("check es_nodes_load high,cpu load:%s,ori_cpu_info:%s" % ( str(es_nodes_list), str(es_nodes_info_list))) return True else: return False except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return True @classmethod def get_tag_topic_list(cls, tag_id, have_read_topic_id_list, size=100): try: functions_list = list() # for id in tag_id: # functions_list.append( # { # "filter": {"term": {"tag_list": id}}, # "weight": 1 # } # ) functions_list += [ { "filter": { "constant_score": { "filter": { "term": {"content_level": 6}} } }, "weight": 60 }, { "filter": { "constant_score": { "filter": { "term": {"content_level": 5}} } }, "weight": 50 }, { "filter": { "constant_score": { "filter": { "term": {"content_level": 4}} } }, "weight": 40 } ] q = { "query": { "function_score": { "query": { "bool": { "must": [ {"range": {"content_level": {"gte": 4, "lte": 6}}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}}, {"terms": {"tag_list": tag_id}} ] } }, "boost_mode": "sum", "score_mode": "sum", "functions": functions_list } }, "_source": { "include": ["id"] }, "sort": [ {"_score": {"order": "desc"}}, {"create_time_val": {"order": "desc"}}, # {"language_type": {"order": "asc"}}, ] } if len(have_read_topic_id_list) > 0: q["query"]["function_score"]["query"]["bool"]["must_not"] = { "terms": { "id": have_read_topic_id_list } } result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q, offset=0, size=size, routing="4,5,6") topic_id_list = [item["_source"]["id"] for item in result_dict["hits"]] logging.info("topic_id_list:%s" % str(topic_id_list)) return topic_id_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return list() @classmethod def get_tag_topic_list_dict(cls, tag_id, have_read_topic_id_list, size=100): try: functions_list = list() for id in tag_id: functions_list.append( { "filter": {"term": {"tag_list": id}}, "weight": 1 } ) # functions_list += [ # { # "filter": {"term": {"content_level": 6}}, # "weight": 6000 # }, # { # "filter": {"term": {"content_level": 5}}, # "weight": 5000 # }, # { # "filter": {"term": {"content_level": 4}}, # "weight": 4000 # } # ] q = { "query": { "function_score": { "query": { "bool": { "must": [ {"term": {"content_level": 6}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}}, {"terms": {"tag_list": tag_id}} ] } }, "boost_mode": "sum", "score_mode": "sum", "functions": functions_list } }, "_source": { "include": ["id", "user_id"] }, "sort": [ {"_score": {"order": "desc"}}, {"create_time_val": {"order": "desc"}}, {"language_type": {"order": "asc"}}, ], "collapse": { "field": "user_id" } } if len(have_read_topic_id_list) > 0: q["query"]["function_score"]["query"]["bool"]["must_not"] = { "terms": { "id": have_read_topic_id_list } } result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic-high-star", query_body=q, offset=0, size=size, routing="6") topic_id_list = [item["_source"]["id"] for item in result_dict["hits"]] # logging.info("topic_id_list:%s" % str(topic_id_list)) # topic_id_dict = [{str(item["_source"]["id"]):item["_source"]["user_id"]} for item in result_dict["hits"]] topic_id_dict = dict() for item in result_dict["hits"]: topic_id_dict[str(item["_source"]["id"])] = item["_source"]["user_id"] logging.info("topic_id_list:%s" % str(topic_id_dict)) return topic_id_list, topic_id_dict except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return list() # 先获取一部分数据 es_cli_obj = ESPerform.get_cli() content_level = [6, 5, 4, 3, 2, 1] for lev in content_level: q = {} q["query"] = { "bool": { "must": [ {"term": {"is_online": True}}, {"term": {"content_level": lev}}, {"range": {"update_time": {"gte": "2019-05-09T00:00:00+00:00"}}} ] } } result_dict = ESPerform.get_search_results(es_cli_obj, "topic", q, 0, 100) result_dict_ids = list() for old_item in result_dict["hits"]: old_source = old_item["_source"] old_id = old_source["id"] # 获取新index es_cli_obj = ESPerform.get_cli() q = {} q["query"] = { "term": { "id": old_id, "is_online": True } } result_dict_test = ESPerform.get_search_results(es_cli_obj, "topic", q, 0, 1) for new_item in result_dict_test["hits"]: new_source = new_item["_source"] new_id = new_source["id"] print(new_id) print("-----id-----") if old_source["is_shadow"] != new_source["is_shadow"]: print(old_source["is_shadow"]) print(new_source["is_shadow"]) print("-----is_shadow-----") if old_source["content_level"] != new_source["content_level"]: print(old_source["content_level"]) print(new_source["content_level"]) print("-----content_level-----") if old_source["is_online"] != new_source["is_online"]: print(old_source["is_online"]) print(new_source["is_online"]) print("-----is_online-----") if old_source["is_deleted"] != new_source["is_deleted"]: print(old_source["is_deleted"]) print(new_source["is_deleted"]) print("-----is_deleted-----") if old_source["is_recommend"] != new_source["is_recommend"]: print(old_source["is_recommend"]) print(new_source["is_recommend"]) print("-----is_recommend-----") if old_source["is_complaint"] != new_source["is_complaint"]: print(old_source["is_complaint"]) print(new_source["is_complaint"]) print("-----is_complaint-----") if old_source["is_excellent"] != new_source["is_excellent"]: print(old_source["is_excellent"]) print(new_source["is_excellent"]) print("-----is_excellent-----") if old_source["is_operation_home_recommend"] != new_source["is_operation_home_recommend"]: print(old_source["is_operation_home_recommend"]) print(new_source["is_operation_home_recommend"]) print("-----is_operation_home_recommend-----") if old_source["vote_num"] != new_source["vote_num"]: print(old_source["vote_num"]) print(new_source["vote_num"]) print("-----vote_num-----") if old_source["reply_num"] != new_source["reply_num"]: print(old_source["reply_num"]) print(new_source["reply_num"]) print("-----reply_num-----") if old_source["user_id"] != new_source["user_id"]: print(old_source["user_id"]) print(new_source["user_id"]) print("-----user_id-----") if old_source["group_id"] != new_source["group_id"]: print(old_source["group_id"]) print(new_source["group_id"]) print("-----group_id-----") if old_source["share_num"] != new_source["share_num"]: print(old_source["share_num"]) print(new_source["share_num"]) print("-----share_num-----") if old_source["offline_score"] != new_source["offline_score"]: print(old_source["offline_score"]) print(new_source["offline_score"]) print("-----offline_score-----") if old_source["manual_score"] != new_source["manual_score"]: print(old_source["manual_score"]) print(new_source["manual_score"]) print("-----manual_score-----") if old_source["has_image"] != new_source["has_image"]: print(old_source["has_image"]) print(new_source["has_image"]) print("-----has_image-----") if old_source["has_video"] != new_source["has_video"]: print(old_source["has_video"]) print(new_source["has_video"]) print("-----has_video-----") if old_source["language_type"] != new_source["language_type"]: print(old_source["language_type"]) print(new_source["language_type"]) print("-----language_type-----") if old_source["virtual_content_level"] != new_source["virtual_content_level"]: print(old_source["virtual_content_level"]) print(new_source["virtual_content_level"]) print("-----virtual_content_level-----") if old_source["like_num_crawl"] != new_source["like_num_crawl"]: print(old_source["like_num_crawl"]) print(new_source["like_num_crawl"]) print("-----like_num_crawl-----") if old_source["comment_num_crawl"] != new_source["comment_num_crawl"]: print(old_source["comment_num_crawl"]) print(new_source["comment_num_crawl"]) print("-----comment_num_crawl-----") if old_source["platform"] != new_source["platform"]: print(old_source["platform"]) print(new_source["platform"]) print("-----platform-----") if old_source["platform_id"] != new_source["platform_id"]: print(old_source["platform_id"]) print(new_source["platform_id"]) print("-----platform_id-----") # if old_source["drop_score"] != new_source["drop_score"]: print(old_source["drop_score"]) print(new_source["drop_score"]) print("-----drop_score-----") if old_source["sort_score"] != new_source["sort_score"]: print(old_source["sort_score"]) print(new_source["sort_score"]) print("-----sort_score-----") if old_source["create_time_val"] != new_source["create_time_val"]: print(old_source["create_time_val"]) print(new_source["create_time_val"]) print("-----create_time_val-----") # if old_source["update_time_val"] != new_source["update_time_val"]: print(old_source["update_time_val"]) print(new_source["update_time_val"]) print("-----update_time_val-----") if old_source["total_vote_num"] != new_source["total_vote_num"]: print(old_source["total_vote_num"]) print(new_source["total_vote_num"]) print("-----total_vote_num-----") if old_source["pictorial_id"] != new_source["pictorial_id"]: print(old_source["pictorial_id"]) print(new_source["pictorial_id"]) print("-----pictorial_id-----") if old_source["pick_id_list"] != new_source["pick_id_list"]: print(old_source["pick_id_list"]) print(new_source["pick_id_list"]) print("-----pick_id_list-----") if old_source["tag_list"] != new_source["tag_list"]: print(old_source["tag_list"]) print(new_source["tag_list"]) print("-----tag_list-----") if old_source["edit_tag_list"] != new_source["edit_tag_list"]: print(old_source["edit_tag_list"]) print(new_source["edit_tag_list"]) print("-----edit_tag_list-----") if old_source["tag_name_list"] != new_source["tag_name_list"]: print(old_source["tag_name_list"]) print(new_source["tag_name_list"]) print("-----tag_name_list-----") if old_source["name"] != new_source["name"]: print(old_source["name"]) print(new_source["name"]) print("-----name-----") if old_source["description"] != new_source["description"]: print(old_source["description"]) print(new_source["description"]) print("-----description-----") if old_source["user_nick_name"] != new_source["user_nick_name"]: print(old_source["user_nick_name"]) print(new_source["user_nick_name"]) print("-----user_nick_name-----") if old_source["user_nick_name_pre"] != new_source["user_nick_name_pre"]: print(old_source["user_nick_name_pre"]) print(new_source["user_nick_name_pre"]) print("-----user_nick_name_pre-----")