Commit 62339c2e authored by lixiaofang's avatar lixiaofang

add

parent 445ee197
...@@ -9,5 +9,7 @@ ...@@ -9,5 +9,7 @@
<config name="initializer_list"> <config name="initializer_list">
<element value="search_tips.django_init"/> <element value="search_tips.django_init"/>
<element value="search.view.auto_tips"/> <element value="search.view.auto_tips"/>
<element value="associate.search.views.auto_tips"/>
<element value="associate.search.views.hotword_results"/>
</config> </config>
</gm_rpcd_config> </gm_rpcd_config>
from django.contrib import admin
# Register your models here.
from django.apps import AppConfig
class Trans2EsConfig(AppConfig):
name = 'trans2es'
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import logging
import traceback
import pypinyin
from pypinyin import lazy_pinyin
from gm_types.gaia import SERVICE_ITEM_PRICE_TYPE, DOCTOR_TYPE
import uuid
def uuid4():
"""
:return:
"""
return uuid.uuid4().hex
def get_tips_suggest_list(instance_cn_name):
try:
# ch_full_weight = 6.0 * 1000
# py_full_weight = 3.0 * 1000
full_weight = 3.0 * 1000
py_acronym_full_weight = 3.0 * 1000
py_acronym_prefix_weight = 2
ch_prefix_weight = 1.5
py_prefix_weight = 1.0
# 命中开始部分加权
begin_prefix_weight = 1.2 * 1000
ch_full_word = instance_cn_name.strip()
py_full_word = ''.join(lazy_pinyin(ch_full_word))
py_acronym_full_word = ''.join(lazy_pinyin(ch_full_word, style=pypinyin.FIRST_LETTER))
suggest_dict = dict()
cur_index = 0
# 中文
for i in range(len(ch_full_word)):
ch_name_term = ch_full_word[i:].strip()
if ch_name_term and ch_full_word[i] != "(" and ch_full_word[i] != ")":
prefix_weight = ch_prefix_weight if len(ch_name_term) != len(ch_full_word) else full_weight
suggest_type = 0 if len(ch_name_term) != len(ch_full_word) else 1
term_begin_prefix_weight = begin_prefix_weight if i == 0 else 1.0
suggest_item = {
"input": [ch_name_term],
"word_weight": (1.0 * len(ch_name_term) / len(
(ch_full_word))) * prefix_weight * term_begin_prefix_weight,
"suggest_type": suggest_type
}
if ch_name_term[0] not in suggest_dict:
cur_index += 1
suggest_item["cur_index"] = cur_index
suggest_dict[ch_name_term[0]] = suggest_item
else:
suggest_dict[ch_name_term[0]]["input"].append(ch_name_term)
if suggest_item["word_weight"] > suggest_dict[ch_name_term[0]]["word_weight"]:
suggest_dict[ch_name_term[0]]["word_weight"] = suggest_item["word_weight"]
suggest_dict[ch_name_term[0]]["suggest_type"] = suggest_item["suggest_type"]
# 拼音
if py_full_word != ch_full_word:
for i in range(len(py_full_word)):
py_name_term = py_full_word[i:].strip()
if py_name_term and py_full_word[i] != "(" and py_full_word[i] != ")":
prefix_weight = py_prefix_weight if len(py_name_term) != len(py_full_word) else full_weight
suggest_type = 2 if len(py_name_term) != len(py_full_word) else 3
term_begin_prefix_weight = begin_prefix_weight if i == 0 else 1.0
suggest_item = {
"input": [py_name_term],
"word_weight": (1.0 * len(py_name_term) / len(
py_full_word)) * prefix_weight * term_begin_prefix_weight,
"suggest_type": suggest_type
}
if py_name_term[0] not in suggest_dict:
cur_index += 1
suggest_item["cur_index"] = cur_index
suggest_dict[py_name_term[0]] = suggest_item
else:
suggest_dict[py_name_term[0]]["input"].append(py_name_term)
if suggest_item["word_weight"] > suggest_dict[py_name_term[0]]["word_weight"]:
suggest_dict[py_name_term[0]]["word_weight"] = suggest_item["word_weight"]
suggest_dict[py_name_term[0]]["suggest_type"] = suggest_item["suggest_type"]
# 简写
if py_acronym_full_word != py_full_word:
for i in range(len(py_acronym_full_word)):
py_acronym_term = py_acronym_full_word[i:].strip()
if py_acronym_term and py_acronym_full_word[i] != "(" and py_acronym_full_word[i] != ")":
prefix_weight = py_acronym_prefix_weight if len(py_acronym_term) != len(
py_acronym_full_word) else py_acronym_full_weight
suggest_type = 4 if len(py_acronym_term) != len(py_acronym_full_word) else 5
term_begin_prefix_weight = begin_prefix_weight if i == 0 else 1.0
suggest_item = {
"input": [py_acronym_term],
"word_weight": (1.0 * len(py_acronym_term) / len(
py_acronym_full_word)) * prefix_weight * term_begin_prefix_weight,
"suggest_type": suggest_type
}
if py_acronym_term[0] not in suggest_dict:
cur_index += 1
suggest_item["cur_index"] = cur_index
suggest_dict[py_acronym_term[0]] = suggest_item
else:
suggest_dict[py_acronym_term[0]]["input"].append(py_acronym_term)
if suggest_item["word_weight"] > suggest_dict[py_acronym_term[0]]["word_weight"]:
suggest_dict[py_acronym_term[0]]["word_weight"] = suggest_item["word_weight"]
suggest_dict[py_acronym_term[0]]["suggest_type"] = suggest_item["suggest_type"]
return suggest_dict.values()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list()
# coding=utf8
from __future__ import unicode_literals, absolute_import, print_function
import logging
import traceback
import json
from libs.cache import redis_client
class TagTab(object):
Weight = {
'search': 0.4,
'trans': 0.6
}
SearchScore = {
0.7: 100,
0.1: 80,
0.05: 60,
0.01: 40,
0: 20,
}
TransScore = {
0.7: 100,
0.5: 80,
0.3: 60,
0.1: 40,
0: 20
}
def cal_score(self, search_rate, conversion_rate):
s1 = self._cal_score(search_rate, 'SearchScore') * self.Weight['search']
s2 = self._cal_score(conversion_rate, 'TransScore') * self.Weight['trans']
return s1 + s2
def _cal_score(self, item, type_):
item *= 100.0
scoreweight = getattr(self, type_)
for k in sorted(scoreweight.keys(), reverse=True):
if item >= k:
return scoreweight[k]
class DocTab(TagTab):
SearchScore = {
0.04: 100,
0.01: 80,
0.001: 60,
0.0002: 40,
0: 20,
}
TransScore = {
0.47: 100,
0.2: 80,
0.1: 60,
0.01: 40,
0: 20
}
class HosTab(TagTab):
SearchScore = {
0.47: 100,
0.2: 80,
0.1: 60,
0.01: 40,
0: 20
}
TransScore = {
1: 100,
0.45: 90,
0.27: 80,
0.21: 70,
0.15: 60,
0.12: 50,
0.09: 40,
0.06: 30,
0.04: 20,
0: 10,
}
class QueryWordAttr(object):
# 获取TagConversion表最新的日期
tag_latest_date = None
doctor_latest_date = None
hospital_latest_date = None
tips_num_redis_key_prefix = "query:associate_tip:tag_id:"
tagtab = TagTab()
doctab = DocTab()
hostab = HosTab()
@classmethod
def get_query_results_num(cls, id):
try:
key = cls.tips_num_redis_key_prefix + str(id)
results_num = redis_client.get(key)
# results_num = json.loads(str(redis_data, encoding="utf-8")) if redis_data else {}
return int(results_num)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return 0
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import django.db.models
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
import six
import sys
from libs.es import ESPerform
import trans2es.models as md
from libs.table_scan import TableSlicer,TableSlicerChunk
from trans2es.type_info import get_type_info_map,TypeInfo
from libs.cache import redis_client
import json
class Job(object):
__es = None
def __init__(self, sub_index_name, type_name, chunk):
assert isinstance(sub_index_name, six.string_types)
assert isinstance(type_name, six.string_types)
assert isinstance(chunk, TableSlicerChunk)
self._sub_index_name = sub_index_name
self._type_name = type_name
self._chunk = chunk
@classmethod
def get_es(cls):
if cls.__es is None:
cls.__es = ESPerform().get_cli()
return cls.__es
def __call__(self):
type_info = get_type_info_map()[self._type_name]
assert isinstance(type_info, TypeInfo)
result = type_info.insert_table_chunk(
sub_index_name=self._sub_index_name,
table_chunk=self._chunk,
es=self.get_es(),
)
class Command(BaseCommand):
args = ''
help = 'dump data to elasticsearch, parallel'
from optparse import make_option
option_list = BaseCommand.option_list + (
make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',default=''),
make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch', metavar='INDEX_PREFIX'),
make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default='')
)
def __sync_data_by_type(self, type_name):
try:
type_info = get_type_info_map()[type_name]
query_set = type_info.queryset
slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
for chunk in slicer.chunks():
job = Job(
sub_index_name=type_info.name,
type_name=type_name,
chunk=chunk,
)
job()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def handle(self, *args, **options):
try:
type_name_list = get_type_info_map().keys()
for type_name in type_name_list:
if len(options["type"]):
if options["type"] == "all" or type_name==options["type"]:
logging.info("begin sync [%s] data to es!" % type_name)
self.__sync_data_by_type(type_name)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from django.conf import settings
from django.core.management.base import BaseCommand
import traceback
import logging
from libs.es import ESPerform
from trans2es.type_info import get_type_info_map
class Command(BaseCommand):
args = ''
help = 'dump mapping to elasticsearch'
from optparse import make_option
option_list = BaseCommand.option_list + (
make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',
default=''),
make_option('-T', '--indices_template', dest='indices_template',
help='index template name to dump data to elasticsearch', metavar='TYPE',
default=''),
make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch',
metavar='INDEX_PREFIX'),
make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
)
def handle(self, *args, **options):
try:
es_cli = ESPerform.get_cli()
type_name = "associate_tag"
if len(options["type"]):
if options["type"] == "all" or type_name == options["type"]:
type_name = options["type"]
official_index_name = ESPerform.get_official_index_name(type_name)
index_exists = es_cli.indices.exists(official_index_name)
if not index_exists:
logging.info("begin create [%s] index!" % type_name)
ESPerform.create_index(es_cli, type_name)
logging.info("begin create [%s] mapping!" % type_name)
ESPerform.put_index_mapping(es_cli, type_name)
if len(options["indices_template"]):
template_file_name = options["indices_template"]
if ESPerform.put_indices_template(es_cli=es_cli, template_file_name=template_file_name,
template_name=template_file_name):
logging.info("put indices template suc!")
else:
logging.error("put indices template err!")
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import django.db.models
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
import six
import sys
from libs.es import ESPerform
import trans2es.models as md
from libs.table_scan import TableSlicer, TableSlicerChunk
from trans2es.type_info import get_type_info_map, TypeInfo, tips_attr_sync_to_redis_type_info_map
from libs.cache import redis_client
import json
class Command(BaseCommand):
args = ''
help = 'dump data to redis, parallel'
from optparse import make_option
option_list = BaseCommand.option_list + (
make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',
default=''),
make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch',
metavar='INDEX_PREFIX'),
make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
make_option('-S', '--sync_type', dest='sync_type', help='sync data to es', metavar='TYPE', default='')
)
def handle(self, *args, **options):
try:
type_name_list = tips_attr_sync_to_redis_type_info_map().keys()
for type_name in type_name_list:
if len(options["type"]):
if options["type"] == "all" or type_name == options["type"]:
logging.info("begin sync [%s] data to redis!" % type_name)
type_info = tips_attr_sync_to_redis_type_info_map()[type_name]
query_set = type_info.queryset
slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
for chunk in slicer.chunks():
for instance in list(chunk):
logging.info("get instance:%s" % instance)
type_info.get_data_func(instance)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
{
"dynamic":"strict",
"properties": {
"id":{"type":"text"},
"suggest":{
"analyzer":"keyword",
"search_analyzer":"keyword",
"type":"completion",
"contexts":[
{
"name":"is_online",
"type": "category"
}
]
},
"suggest_type":{"type":"long"},//0-汉字,1-汉字全拼,2-拼音,3-拼音全拼,4-拼音简写,5-拼音简写全拼
"agile_tag_type":{"type":"long"},//tips数据所属类型,0-tag,1-hospital,2-doctor,3-wiki
"ori_name":{"type":"keyword"},//原名称
"results_num":{"type":"integer"},//结果数量
"is_online":{"type":"boolean"},//上线
"offline_score":{"type":"double"},//离线分
"agile_tag_id":{"type":"long"},//标签ID
"create_tag_type":{"type":"long"},//标签创建类型
"style":{"type":"long"},//新标签样式
"topic_recommend_sort":{"type":"long"}
}
}
from django.db import models
# Create your models here.
from __future__ import unicode_literals, absolute_import, print_function
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import traceback
import base64
from django.db import models
from gm_types.gaia import (
AGILE_TAG_TYPE,
AGILE_TAG_CREATE_TYPE,
AGILE_TAG_STYLE,
AGILE_TAG_RECOMMEND_TYPE,
)
class BaseModel(models.Model):
class Meta:
abstract = True
is_online = models.BooleanField(verbose_name=u"是否有效", default=True)
create_time = models.DateTimeField(verbose_name=u'创建时间', auto_now_add=True)
update_time = models.DateTimeField(verbose_name=u'更新时间', auto_now=True)
class AgileTag(BaseModel):
class Meta:
verbose_name = u'新标签'
db_table = 'api_agile_tag'
app_label = 'api'
name = models.CharField(verbose_name=u'新标签名字', max_length=128, null=False, unique=True, default='')
description = models.TextField(verbose_name=u'描述', default='')
create_tag_type = models.CharField(verbose_name=u"标签创建类型", max_length=3, choices=AGILE_TAG_CREATE_TYPE)
style = models.CharField(verbose_name=u"标签样式", max_length=3, choices=AGILE_TAG_STYLE)
topic_recommend_sort = models.IntegerField(verbose_name=u'帖子推荐排序', default=9999)
class AgileTagType(BaseModel):
class Meta:
verbose_name = u'新标签类型(可多选)'
db_table = 'api_agile_tag_type'
app_label = 'api'
agile_tag_id = models.IntegerField(verbose_name=u'新标签', db_index=True)
agile_tag_type = models.CharField(verbose_name=u"标签类型", max_length=3, choices=AGILE_TAG_TYPE)
@property
def get_by_id_name(self):
try:
results = AgileTag.objects.filter(id=self.agile_tag_id).values()
return results[0]
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {}
# class AgileTagRecommendType(BaseModel):
# class Meta:
# verbose_name = u'新标签推荐类型(可多选)'
# db_table = 'api_agile_tag_recommend_type'
# app_label = 'api'
#
# agile_tag_id = models.IntegerField(verbose_name=u'新标签', db_index=True)
# agile_tag_type = models.CharField(verbose_name=u"标签推荐类型", max_length=3, choices=AGILE_TAG_RECOMMEND_TYPE)
#
# class AgileTagMapping(BaseModel):
# class Meta:
# verbose_name = u'新标签与老标签映射关系(可多选)'
# db_table = 'api_agile_tag_mapping'
# app_label = 'api'
#
# agile_tag_id = models.IntegerField(verbose_name=u'新标签', db_index=True)
# old_tag_id = models.IntegerField(verbose_name=u'老标签', db_index=True)
#
# class AgileTagRelationPolymer(BaseModel):
# class Meta:
# verbose_name = u'新标签关联的内容聚合页(可多选)'
# db_table = 'api_agile_tag_relation_polymer'
# app_label = 'api'
#
# agile_tag_id = models.IntegerField(verbose_name=u'新标签', db_index=True)
# polymer_id = models.IntegerField(verbose_name=u'聚合页id', db_index=True)
# !/usr/bin/env python
# encoding=utf-8
from __future__ import absolute_import
from django.contrib import admin
# Register your models here.
from __future__ import unicode_literals
from django.apps import AppConfig
class SearchConfig(AppConfig):
name = 'search'
from __future__ import unicode_literals
from django.db import models
# Create your models here.
from django.test import TestCase
# Create your tests here.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, absolute_import, print_function
import logging
import traceback
from libs.es import ESPerform
def get_suggest_tips(query, agile_tag_type):
try:
# ios输入法在某些情况下会携带\\u2006
query = query.replace("\u2006", '')
q ={}
# if agile_tag_type != -1:
# q["query"] =
q = {
"suggest": {
"tips-suggest": {
"prefix": query,
"completion": {
"field": "suggest",
"size": 50,
"contexts": {
"is_online": [True]
},
"fuzzy": {
"fuzziness": 0
}
}
}
}
}
logging.info("get qqqqqqqq:%s" % q)
have_read_tips_set = set()
ret_list = list()
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="associate_tag", query_body=q,
offset=0, size=50, is_suggest_request=True)
logging.info("get result_dict:%s" % result_dict)
for tips_item in result_dict["suggest"]["tips-suggest"]:
for hit_item in tips_item["options"]:
if hit_item["_source"]["ori_name"] not in have_read_tips_set:
have_read_tips_set.add(hit_item["_source"]["ori_name"])
highlight_marks = u'<ems>%s</ems>' % query
hit_item["_source"]["highlight_name"] = hit_item["_source"]["ori_name"].replace(query,
highlight_marks)
ori_name = hit_item["_source"]["ori_name"]
results_num = hit_item["_source"]["results_num"]
highlight_name = hit_item["_source"]["highlight_name"]
ret_list.append([{"ori_name": ori_name, "results_num": results_num, "highlight": highlight_name}])
return ret_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, absolute_import, print_function
from gm_rpcd.all import bind
import logging
import traceback
import json
from libs.es import ESPerform
from libs.cache import redis_client
from libs.tools import json_http_response
from django.shortcuts import render
from search.utils.auto_tips import get_suggest_tips
def auto_complete(request):
try:
"""auto complate words/tags/doctors etc.
URL:
~/search/auto_complete?scope=[kw]&q=双
Return:
{'error': 0|1, 'data': [word, word, word],}
"""
q = request.GET.get('q', '').strip()
lat = request.GET.get('lat', 0).strip()
lng = request.GET.get('lng', 0).strip()
if not q:
return json_http_response({'error': 0, 'data': []})
data = get_suggest_tips(q, float(lat), float(lng))
result = {
'error': 0,
'data': data,
}
logging.info("duan add,q is:%s,result:%s" % (str(q).encode("utf-8"),str(result).encode('utf-8')))
return json_http_response(result)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return json_http_response({'error': 1, 'data': []})
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, absolute_import, print_function
from gm_rpcd.all import bind
import logging
import traceback
import json
from libs.es import ESPerform
from libs.cache import redis_client
from libs.tools import json_http_response
from associate.search.utils.auto_tips import get_suggest_tips
@bind("associate/search/auto_complete_query")
def auto_complete_query(query, agile_tag_type=-1):
try:
"""auto complate words/tags/doctors etc.
URL:
~/api/auto_complete?scope=[kw]&q=双
Return:
{'error': 0|1, 'data': [word, word, word],}
"""
# q = request.GET.get('q', '').strip()
# if not q:
# return json_http_response({'error': 0, 'data': []})
data = get_suggest_tips(query, agile_tag_type)
result = {
'error': 0,
'data': data,
}
return result
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"error": 1, "data": []}
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, absolute_import, print_function
from gm_rpcd.all import bind
import logging
import traceback
from associate.commons.words_utils import QueryWordAttr
@bind("associate/search/tag_hotword_num")
def get_hotword_num(tag_id_list):
try:
tag_id_num = list()
for id in tag_id_list:
result = QueryWordAttr.get_query_results_num(id)
tag_id_num.append({"tag_id": id, "hot_num": result})
return {"tag_hotword_num": tag_id_num}
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"tag_hotword_num": []}
from django.test import TestCase
# Create your tests here.
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import time
import datetime
import logging
import traceback
import django.db.models
from django.conf import settings
from libs.es import ESPerform
import elasticsearch
import elasticsearch.helpers
import sys
import copy
from libs.es import ESPerform
from libs.tools import tzlc, getMd5Digest
from trans2es.models import agile_tag
from trans2es.utils.agile_tag_transfer import TagTransfer
from gm_types.gaia import (
AGILE_TAG_TYPE,
AGILE_TAG_CREATE_TYPE,
AGILE_TAG_STYLE,
AGILE_TAG_RECOMMEND_TYPE,
)
__es = None
def get_elasticsearch_instance():
global __es
if __es is None:
__es = ESPerform.get_cli()
return __es
def get_es_list_by_type(es_type):
return [get_elasticsearch_instance()]
class TypeInfo(object):
def __init__(
self,
name,
type,
model,
query_deferred,
get_data_func,
bulk_insert_chunk_size,
round_insert_chunk_size,
round_insert_period,
batch_get_data_func=None, # receive a list of pks, not instance
logic_database_id=None,
):
self.name = name
self.type = type
self.model = model
self.query_deferred = query_deferred
self.get_data_func = get_data_func
self.batch_get_data_func = batch_get_data_func
self.pk_blacklist = ()
self.bulk_insert_chunk_size = bulk_insert_chunk_size
self.round_insert_chunk_size = round_insert_chunk_size
self.round_insert_period = round_insert_period
self.logic_database_id = logic_database_id
@property
def query(self):
return self.query_deferred()
@property
def queryset(self):
return django.db.models.QuerySet(model=self.model, query=self.query)
@property
def pk_blacklist(self):
return self.__pk_blacklist
@pk_blacklist.setter
def pk_blacklist(self, value):
self.__pk_blacklist = frozenset(value)
def bulk_get_data(self, instance_iterable):
data_list = []
if self.batch_get_data_func:
_pk_list = [getattr(instance, 'pk', None) for instance in instance_iterable]
not_found_pk_list = []
blacklisted_pk_list = []
pk_list = []
for pk in _pk_list:
if pk is None:
not_found_pk_list.append(pk)
elif pk in self.__pk_blacklist:
blacklisted_pk_list.append(pk)
else:
pk_list.append(pk)
if not_found_pk_list:
logging.exception('those pks not found for name={}, doc_type={}, pk_list={}'.format(
self.name,
self.type,
str(not_found_pk_list),
))
if blacklisted_pk_list:
logging.info('those pks are in blacklist for name={}, doc_type={}, pk_list={}'.format(
self.name,
self.type,
str(blacklisted_pk_list),
))
try:
data_list = self.batch_get_data_func(pk_list)
except Exception:
traceback.print_exc()
logging.exception('bulk_get_data for name={}, doc_type={}, pk_list={}'.format(
self.name,
self.type,
str(pk_list),
))
else:
for instance in instance_iterable:
pk = getattr(instance, 'pk', None)
try:
if pk is None:
raise Exception('pk not found')
if pk in self.__pk_blacklist:
logging.info('bulk_get_data for name={}, doc_type={}, pk={}: ignore blacklisted pk'.format(
self.name,
self.type,
pk,
))
continue
data = self.get_data_func(instance)
(item_dict, suggest_list) = data
for suggest_item in suggest_list:
suggest_dict = copy.deepcopy(item_dict)
suggest_dict["suggest_type"] = suggest_item["suggest_type"]
suggest_dict["offline_score"] = suggest_item["word_weight"]
suggest_dict["id"] = str(suggest_dict["id"]) + "_" + str(suggest_item["cur_index"])
suggest_dict["suggest"] = {
"input": suggest_item["input"],
"weight": int(suggest_dict["offline_score"]),
"contexts": {
"is_online": suggest_dict["is_online"]
}
}
data_list.append(suggest_dict)
except Exception:
traceback.print_exc()
logging.exception('bulk_get_data for name={}, doc_type={}, pk={}'.format(
self.name,
self.type,
pk,
))
return data_list
def elasticsearch_bulk_insert_data(self, sub_index_name, data_list, es=None):
# assert (es is not None)
# index = ESPerform.get_official_index_name(sub_index_name=sub_index_name,index_flag="write")
# bulk_actions = []
# for data in data_list:
# bulk_actions.append({
# '_op_type': 'index',
# '_index': index,
# '_type': "_doc",
# '_id': data['id'],
# '_source': data,
# })
#
# es_result = None
# if bulk_actions:
# for t in es:
# try:
# es_result = elasticsearch.helpers.bulk(client=t, actions=bulk_actions)
# except Exception as e:
# traceback.print_exc()
# es_result = 'error'
return ESPerform.es_helpers_bulk(es, data_list, sub_index_name, True)
def elasticsearch_bulk_insert(self, sub_index_name, instance_iterable, es=None):
data_list = self.bulk_get_data(instance_iterable)
return self.elasticsearch_bulk_insert_data(
sub_index_name=sub_index_name,
data_list=data_list,
es=es,
)
def insert_table_by_pk_list(self, sub_index_name, pk_list, es=None, use_batch_query_set=False):
if use_batch_query_set:
qs = self.queryset
else:
qs = self.model.objects.all()
instance_list = qs.filter(pk__in=pk_list)
data_list = self.bulk_get_data(instance_list)
logging.info("get data_list:%s" % data_list)
self.elasticsearch_bulk_insert_data(
sub_index_name=sub_index_name,
data_list=data_list,
es=es,
)
def insert_table_chunk(self, sub_index_name, table_chunk, es=None):
start_clock = time.clock()
start_time = time.time()
instance_list = list(table_chunk)
stage_1_time = time.time()
data_list = self.bulk_get_data(instance_list)
stage_2_time = time.time()
es_result = ESPerform.es_helpers_bulk(
es_cli=es,
data_list=data_list,
sub_index_name=sub_index_name,
auto_create_index=True
)
stage_3_time = time.time()
end_clock = time.clock()
return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} '
'{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} '
'{response}').format(
datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
index_prefix=sub_index_name,
type_name=self.name,
pk_start=repr(table_chunk.get_pk_start()),
pk_stop=repr(table_chunk.get_pk_stop()),
count=len(instance_list),
stage_1_duration=stage_1_time - start_time,
stage_2_duration=stage_2_time - stage_1_time,
stage_3_duration=stage_3_time - stage_2_time,
clock_duration=end_clock - start_clock,
response=es_result,
)
_get_type_info_map_result = None
def get_type_info_map():
global _get_type_info_map_result
if _get_type_info_map_result:
return _get_type_info_map_result
type_info_list = [
TypeInfo(
name='associate_tag',
type='associate_tag',
model=agile_tag.AgileTagType,
query_deferred=lambda: agile_tag.AgileTagType.objects.filter(
agile_tag_type__in=[AGILE_TAG_TYPE.PROJECT, AGILE_TAG_TYPE.POSITION, AGILE_TAG_TYPE.BRAND,
AGILE_TAG_TYPE.INSTRUMENT, AGILE_TAG_TYPE.HOSPITAL, AGILE_TAG_TYPE.DOCTOR]).query,
get_data_func=TagTransfer.get_tag_suggest_data_list,
bulk_insert_chunk_size=100,
round_insert_chunk_size=5,
round_insert_period=2,
)
]
type_info_map = {
type_info.type: type_info
for type_info in type_info_list
}
_get_type_info_map_result = type_info_map
return type_info_map
def tips_attr_sync_to_redis_type_info_map():
global _get_type_info_map_result
if _get_type_info_map_result:
return _get_type_info_map_result
type_info_list = [
TypeInfo(
name='associate',
type='associate_results_num', # api_wordrelresemble
model=agile_tag.AgileTagType,
query_deferred=lambda: agile_tag.AgileTagType.objects.filter(
agile_tag_type__in=[AGILE_TAG_TYPE.PROJECT, AGILE_TAG_TYPE.POSITION, AGILE_TAG_TYPE.BRAND,
AGILE_TAG_TYPE.INSTRUMENT, AGILE_TAG_TYPE.HOSPITAL, AGILE_TAG_TYPE.DOCTOR]).query,
get_data_func=TagTransfer.set_data_to_redis,
bulk_insert_chunk_size=100,
round_insert_chunk_size=5,
round_insert_period=2,
)
]
type_info_map = {
type_info.type: type_info
for type_info in type_info_list
}
_get_type_info_map_result = type_info_map
logging.info("get type_info_map:%s" % type_info_map)
return type_info_map
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import logging
import traceback
from libs.tools import tzlc, getMd5Digest
from libs.es import ESPerform
from libs.cache import redis_client
import json
import base64
from trans2es.models import agile_tag
from trans2es.commons.commons import get_tips_suggest_list
from django.conf import settings
from trans2es.commons.words_utils import QueryWordAttr
class TagTransfer(object):
@classmethod
def get_tag_suggest_data_list(cls, instance):
try:
ret_list = list()
item_dict = dict()
results = instance.get_by_id_name
name = results["name"].strip()
item_dict["id"] = getMd5Digest(str(name))
item_dict["ori_name"] = name
item_dict["is_online"] = results["is_online"]
item_dict["results_num"] = QueryWordAttr.get_query_results_num(instance.agile_tag_id)
item_dict["agile_tag_type"] = instance.agile_tag_type
item_dict["agile_tag_id"] = instance.agile_tag_id
item_dict["create_tag_type"] = results["create_tag_type"]
item_dict["style"] = results["style"]
item_dict["topic_recommend_sort"] = results["topic_recommend_sort"]
item_dict["offline_score"] = 0.0
ret_list.append(item_dict)
suggest_list = get_tips_suggest_list(name)
logging.info("get ret_list:%s" % ret_list)
logging.info("get suggest_list:%s" % ret_list)
return (item_dict, suggest_list)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return ([], [])
@classmethod
def get_tractate_data_name_mapping_results_to_redis(cls, name):
try:
tag_name = name
q = dict()
if tag_name:
multi_fields = {
"content": 6,
"tractate_tag_name": 3,
"tractate_tag_name_content": 4,
"author": 2, }
query_fields = ['^'.join((k, str(v))) for (k, v) in multi_fields.items()]
multi_match = {
'query': tag_name,
'type': 'cross_fields',
'operator': 'and',
'fields': query_fields,
}
q['query'] = {
'bool': {
"should": [
{'multi_match': multi_match}
],
"must": [
{"term": {"is_online": True}}
],
"minimum_should_match": 1
}
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(settings.GM_ORI_ES_INFO_LIST),
sub_index_name="tractate", doc_type="tractate", query_body=q,
offset=0, size=0)
doctor_results = result_dict["total_count"]
return doctor_results
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
@classmethod
def set_data_to_redis(self, instance):
try:
query_key = "query:associate_tip:tag_id:"
name = agile_tag.AgileTag.objects.filter(id=instance.agile_tag_id).values_list("name", flat=True)[0]
tractate_results = TagTransfer.get_tractate_data_name_mapping_results_to_redis(name)
key = query_key + str(instance.agile_tag_id)
redis_client.set(key, tractate_results)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
from django.shortcuts import render
# Create your views here.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment