Commit ef5fe394 authored by Kai's avatar Kai

Merge branch 'test' of git.wanmeizhensuo.com:alpha/physical into test

parents 967de4fc d7e0b6c3
......@@ -17,7 +17,6 @@ class ESPerform(object):
cli_info_list = settings.ES_INFO_LIST
index_prefix = settings.ES_INDEX_PREFIX
@classmethod
def get_cli(cls):
try:
......@@ -32,7 +31,7 @@ class ESPerform(object):
return None
@classmethod
def get_official_index_name(cls,sub_index_name,index_flag=None):
def get_official_index_name(cls, sub_index_name, index_flag=None):
"""
:remark:get official es index name
:param sub_index_name:
......@@ -40,7 +39,7 @@ class ESPerform(object):
:return:
"""
try:
assert (index_flag in [None,"read","write"])
assert (index_flag in [None, "read", "write"])
official_index_name = cls.index_prefix + "-" + sub_index_name
if index_flag:
......@@ -52,11 +51,11 @@ class ESPerform(object):
return None
@classmethod
def __load_mapping(cls,doc_type):
def __load_mapping(cls, doc_type):
try:
mapping_file_path = os.path.join(
os.path.dirname(__file__),
'..', 'trans2es','mapping', '%s.json' % (doc_type,))
'..', 'trans2es', 'mapping', '%s.json' % (doc_type,))
mapping = ''
with open(mapping_file_path, 'r') as f:
for line in f:
......@@ -69,7 +68,7 @@ class ESPerform(object):
return None
@classmethod
def create_index(cls,es_cli,sub_index_name):
def create_index(cls, es_cli, sub_index_name):
"""
:remark: create es index,alias index
:param sub_index_name:
......@@ -82,11 +81,11 @@ class ESPerform(object):
index_exist = es_cli.indices.exists(official_index_name)
if not index_exist:
es_cli.indices.create(official_index_name)
read_alias_name = cls.get_official_index_name(sub_index_name,"read")
es_cli.indices.put_alias(official_index_name,read_alias_name)
read_alias_name = cls.get_official_index_name(sub_index_name, "read")
es_cli.indices.put_alias(official_index_name, read_alias_name)
write_alias_name = cls.get_official_index_name(sub_index_name,"write")
es_cli.indices.put_alias(official_index_name,write_alias_name)
write_alias_name = cls.get_official_index_name(sub_index_name, "write")
es_cli.indices.put_alias(official_index_name, write_alias_name)
return True
except:
......@@ -94,7 +93,7 @@ class ESPerform(object):
return False
@classmethod
def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc",force_sync=False):
def put_index_mapping(cls, es_cli, sub_index_name, mapping_type="_doc", force_sync=False):
"""
:remark: put index mapping
:param es_cli:
......@@ -105,13 +104,13 @@ class ESPerform(object):
try:
assert (es_cli is not None)
write_alias_name = cls.get_official_index_name(sub_index_name,"write")
write_alias_name = cls.get_official_index_name(sub_index_name, "write")
index_exist = es_cli.indices.exists(write_alias_name)
if not index_exist and not force_sync:
return False
mapping_dict = cls.__load_mapping(sub_index_name)
es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type)
es_cli.indices.put_mapping(index=write_alias_name, body=mapping_dict, doc_type=mapping_type)
return True
except:
......@@ -119,7 +118,7 @@ class ESPerform(object):
return False
@classmethod
def put_indices_template(cls,es_cli,template_file_name, template_name):
def put_indices_template(cls, es_cli, template_file_name, template_name):
"""
:remark put index template
:param es_cli:
......@@ -131,7 +130,7 @@ class ESPerform(object):
assert (es_cli is not None)
mapping_dict = cls.__load_mapping(template_file_name)
es_cli.indices.put_template(name=template_name,body=mapping_dict)
es_cli.indices.put_template(name=template_name, body=mapping_dict)
return True
except:
......@@ -139,7 +138,7 @@ class ESPerform(object):
return False
@classmethod
def es_helpers_bulk(cls,es_cli,data_list,sub_index_name,auto_create_index=False,doc_type="_doc"):
def es_helpers_bulk(cls, es_cli, data_list, sub_index_name, auto_create_index=False, doc_type="_doc"):
try:
assert (es_cli is not None)
......@@ -150,12 +149,12 @@ class ESPerform(object):
logging.error("index:%s is not existing,bulk data error!" % official_index_name)
return False
else:
cls.create_index(es_cli,sub_index_name)
cls.put_index_mapping(es_cli,sub_index_name)
cls.create_index(es_cli, sub_index_name)
cls.put_index_mapping(es_cli, sub_index_name)
bulk_actions = []
if sub_index_name=="topic" or sub_index_name=="topic-star-routing":
if sub_index_name == "topic" or sub_index_name == "topic-star-routing":
for data in data_list:
if data:
bulk_actions.append({
......@@ -176,7 +175,7 @@ class ESPerform(object):
'_id': data['id'],
'_source': data,
})
elasticsearch.helpers.bulk(es_cli,bulk_actions)
elasticsearch.helpers.bulk(es_cli, bulk_actions)
return True
except:
......@@ -184,49 +183,50 @@ class ESPerform(object):
return False
@classmethod
def get_search_results(cls, es_cli,sub_index_name,query_body,offset=0,size=10,
auto_create_index=False,doc_type="_doc",aggregations_query=False,is_suggest_request=False,batch_search=False,routing=None):
def get_search_results(cls, es_cli, sub_index_name, query_body, offset=0, size=10,
auto_create_index=False, doc_type="_doc", aggregations_query=False, is_suggest_request=False,
batch_search=False, routing=None):
try:
assert (es_cli is not None)
official_index_name = cls.get_official_index_name(sub_index_name,"read")
official_index_name = cls.get_official_index_name(sub_index_name, "read")
index_exists = es_cli.indices.exists(official_index_name)
if not index_exists:
if not auto_create_index:
logging.error("index:%s is not existing,get_search_results error!" % official_index_name)
return None
else:
cls.create_index(es_cli,sub_index_name)
cls.put_index_mapping(es_cli,sub_index_name)
cls.create_index(es_cli, sub_index_name)
cls.put_index_mapping(es_cli, sub_index_name)
logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8"))
if not batch_search:
if not routing:
res = es_cli.search(index=official_index_name,doc_type=doc_type,body=query_body,from_=offset,size=size)
res = es_cli.search(index=official_index_name, doc_type=doc_type, body=query_body, from_=offset,
size=size)
else:
res = es_cli.search(index=official_index_name, doc_type=doc_type, body=query_body, from_=offset,
size=size,routing=routing)
size=size, routing=routing)
if is_suggest_request:
return res
else:
result_dict = {
"total_count":res["hits"]["total"],
"hits":res["hits"]["hits"]
"total_count": res["hits"]["total"],
"hits": res["hits"]["hits"]
}
if aggregations_query:
result_dict["aggregations"] = res["aggregations"]
return result_dict
else:
res = es_cli.msearch(body=query_body,index=official_index_name, doc_type=doc_type)
res = es_cli.msearch(body=query_body, index=official_index_name, doc_type=doc_type)
logging.info("duan add,msearch res:%s" % str(res))
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"total_count":0,"hits":[]}
return {"total_count": 0, "hits": []}
@classmethod
def if_es_node_load_high(cls, es_cli):
......@@ -240,9 +240,9 @@ class ESPerform(object):
for item in es_nodes_info_list:
try:
item_list = item.split(" ")
if len(item_list)==11:
if len(item_list) == 11:
cpu_load = item_list[4]
elif len(item_list)==10:
elif len(item_list) == 10:
cpu_load = item_list[3]
else:
continue
......@@ -251,11 +251,12 @@ class ESPerform(object):
high_num += 1
es_nodes_list.append(int_cpu_load)
except:
logging.error("catch exception,item:%s,err_msg:%s" % (str(item),traceback.format_exc()))
logging.error("catch exception,item:%s,err_msg:%s" % (str(item), traceback.format_exc()))
return True
if high_num > 3:
logging.info("check es_nodes_load high,cpu load:%s,ori_cpu_info:%s" % (str(es_nodes_list), str(es_nodes_info_list)))
logging.info("check es_nodes_load high,cpu load:%s,ori_cpu_info:%s" % (
str(es_nodes_list), str(es_nodes_info_list)))
return True
else:
return False
......@@ -264,7 +265,7 @@ class ESPerform(object):
return True
@classmethod
def get_tag_topic_list(cls,tag_id,have_read_topic_id_list,size=100):
def get_tag_topic_list(cls, tag_id, have_read_topic_id_list, size=100):
try:
functions_list = list()
for id in tag_id:
......@@ -289,8 +290,8 @@ class ESPerform(object):
}
]
q = {
"query":{
"function_score":{
"query": {
"function_score": {
"query": {
"bool": {
"must": [
......@@ -306,26 +307,26 @@ class ESPerform(object):
"functions": functions_list
}
},
"_source":{
"include":["id"]
"_source": {
"include": ["id"]
},
"sort":[
"sort": [
{"_score": {"order": "desc"}},
{"create_time_val":{"order":"desc"}},
{"language_type":{"order":"asc"}},
{"create_time_val": {"order": "desc"}},
{"language_type": {"order": "asc"}},
]
}
if len(have_read_topic_id_list)>0:
if len(have_read_topic_id_list) > 0:
q["query"]["function_score"]["query"]["bool"]["must_not"] = {
"terms":{
"id":have_read_topic_id_list
"terms": {
"id": have_read_topic_id_list
}
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q,
offset=0, size=size,routing="4,5,6")
offset=0, size=size, routing="4,5,6")
topic_id_list = [item["_source"]["id"] for item in result_dict["hits"]]
logging.info("topic_id_list:%s"%str(topic_id_list))
logging.info("topic_id_list:%s" % str(topic_id_list))
return topic_id_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -15,7 +15,7 @@ class CeleryTaskRouter(object):
],
"majia-alpha": [
'majia.auto_instant_click.auto_reply',
]
],
}
# Map[TaskName, QueueName]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment