tasks.py 2.18 KB
# -*- coding: UTF-8 -*-

from celery import shared_task
from django.conf import settings
from django.core import serializers
from trans2es.type_info import get_type_info_map
#from rpc.all import get_rpc_remote_invoker
from libs.es import ESPerform
import logging
import traceback
from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar
import json

@shared_task
def write_to_es(es_type, pk_list, use_batch_query_set=False):
    try:
        pk_list = list(frozenset(pk_list))
        type_info_map = get_type_info_map()
        type_info = type_info_map[es_type]

        logging.info("duan add,es_type:%s" % str(es_type))
        type_info.insert_table_by_pk_list(
            sub_index_name=es_type,
            pk_list=pk_list,
            use_batch_query_set=use_batch_query_set,
            es=ESPerform.get_cli()
        )
    except:
        logging.error("catch exception,err_msg:%s" % traceback.format_exc())


@shared_task
def sync_face_similar_data_to_redis():
    try:
        result_items = FaceUserContrastSimilar.objects.filter(is_online=True,is_deleted=False).distinct().values("participant_user_id").values_list("participant_user_id",flat=True)

        logging.info("duan add,begin sync_face_similar_data_to_redis!")

        redis_key_prefix = "physical:user_similar:participant_user_id:"
        for participant_user_id in result_items:

            redis_key = redis_key_prefix + str(participant_user_id)
            similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True,is_deleted=False,participant_user_id=participant_user_id,similarity__gt=0.4).order_by("-similarity").limit(100)

            item_list = list()
            for item in similar_result_items:
                item_list.append(
                    {
                        "contrast_user_id":item.contrast_user_id,
                        "similarity":item.similarity
                    }
                )
            redis_client.set(redis_key,json.dumps(item_list))

            logging.info("duan add,participant_user_id:%d set data done!" % participant_user_id)
    except:
        logging.error("catch exception,err_msg:%s" % traceback.format_exc())