Commit 02dc52d7 authored by lixiaofang's avatar lixiaofang

add

parent 69a4a514
from injection.data_sync.tasks import write_to_es
result = write_to_es.delay('associate_tag', [1323], False)
print(result.read())
...@@ -71,4 +71,3 @@ class AgileTagRecommendType(BaseModel): ...@@ -71,4 +71,3 @@ class AgileTagRecommendType(BaseModel):
agile_tag_id = models.IntegerField(verbose_name=u'新标签', db_index=True) agile_tag_id = models.IntegerField(verbose_name=u'新标签', db_index=True)
agile_tag_type = models.CharField(verbose_name=u"标签推荐类型", max_length=3, choices=AGILE_TAG_RECOMMEND_TYPE) agile_tag_type = models.CharField(verbose_name=u"标签推荐类型", max_length=3, choices=AGILE_TAG_RECOMMEND_TYPE)
is_online = models.BooleanField(verbose_name=u"是否有效", default=True)
...@@ -9,7 +9,8 @@ import datetime ...@@ -9,7 +9,8 @@ import datetime
from celery import shared_task from celery import shared_task
from django.conf import settings from django.conf import settings
from django.core import serializers from django.core import serializers
from trans2es.type_info import get_type_info_map,TypeInfo from associate.type_info import get_type_info_map as get_type_info_associate
from trans2es.type_info import get_type_info_map
# from rpc.all import get_rpc_remote_invoker # from rpc.all import get_rpc_remote_invoker
from libs.es import ESPerform from libs.es import ESPerform
from libs.cache import redis_client from libs.cache import redis_client
...@@ -17,18 +18,33 @@ from libs.cache import redis_client ...@@ -17,18 +18,33 @@ from libs.cache import redis_client
@shared_task @shared_task
def write_to_es(es_type, pk_list, use_batch_query_set=False): def write_to_es(es_type, pk_list, use_batch_query_set=False):
logging.info("consume es_type:%s" % str(es_type))
try: try:
pk_list = list(frozenset(pk_list))
type_info_map = get_type_info_map()
type_info = type_info_map[es_type]
logging.info("consume es_type:%s" % str(es_type)) if es_type == "suggest_v1" or es_type == "suggest":
type_info.insert_table_by_pk_list( pk_list = list(frozenset(pk_list))
sub_index_name=type_info.name, type_info_map = get_type_info_map()
pk_list=pk_list, type_info = type_info_map[es_type]
use_batch_query_set=use_batch_query_set,
es=ESPerform.get_cli() type_info.insert_table_by_pk_list(
) sub_index_name=type_info.name,
pk_list=pk_list,
use_batch_query_set=use_batch_query_set,
es=ESPerform.get_cli()
)
else:
if es_type == "suggest_v1" or es_type == "suggest":
pk_list = list(frozenset(pk_list))
type_info_map = get_type_info_associate()
type_info = type_info_map[es_type]
type_info.insert_table_by_pk_list(
sub_index_name=type_info.name,
pk_list=pk_list,
use_batch_query_set=use_batch_query_set,
es=ESPerform.get_cli()
)
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
\ No newline at end of file
...@@ -18,6 +18,8 @@ class CeleryTaskRouter(object): ...@@ -18,6 +18,8 @@ class CeleryTaskRouter(object):
for (queue, task_list) in queue_task_map.items() for (queue, task_list) in queue_task_map.items()
)) ))
logging.info("task_queue_map:%s" % task_queue_map)
def route_for_task(self, task, args=None, kwargs=None): def route_for_task(self, task, args=None, kwargs=None):
""" """
if settings.DEBUG: if settings.DEBUG:
......
...@@ -48,7 +48,7 @@ DEBUG = True ...@@ -48,7 +48,7 @@ DEBUG = True
# } # }
CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ROUTES = ['search_tips.celery_task_router.CeleryTaskRouter'] # CELERY_ROUTES = ['search_tips.celery_task_router.CeleryTaskRouter']
# Application definition # Application definition
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment