1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# -*- coding: UTF-8 -*-
import logging
import traceback
import json
import pymysql
import threading
import random
import datetime
from celery import shared_task
from django.conf import settings
from django.core import serializers
from trans2es.type_info import get_type_info_map
# from rpc.all import get_rpc_remote_invoker
from libs.es import ESPerform
from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar, UserSimilarScore
from linucb.utils.register_user_tag import RegisterUserTag
@shared_task
def write_to_es(es_type, pk_list, use_batch_query_set=False):
try:
pk_list = list(frozenset(pk_list))
if es_type == "register_user_tag":
RegisterUserTag.get_register_user_tag(pk_list)
elif es_type == "attention_user_tag":
RegisterUserTag.get_user_attention_tag(pk_list)
else:
type_info_map = get_type_info_map()
type_info = type_info_map[es_type]
logging.info("consume es_type:%s" % str(es_type))
type_info.insert_table_by_pk_list(
sub_index_name=es_type,
pk_list=pk_list,
use_batch_query_set=use_batch_query_set,
es=ESPerform.get_cli()
)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
@shared_task
def sync_face_similar_data_to_redis():
try:
result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False).distinct().values(
"participant_user_id").values_list("participant_user_id", flat=True)
logging.info("begin sync_face_similar_data_to_redis!")
redis_key_prefix = "physical:user_similar:participant_user_id:"
for participant_user_id in result_items:
redis_key = redis_key_prefix + str(participant_user_id)
similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False,
participant_user_id=participant_user_id,
similarity__gt=0.3).order_by(
"-similarity")
item_list = list()
for item in similar_result_items:
weight_score = int(item.similarity * 100)
item_list.append(
{
"filter":{
"constant_score":{
"filter":{
"term":{"user_id": item.contrast_user_id}
}
}
},
"weight": weight_score*2
}
)
if len(item_list)>=100:
break
redis_client.set(redis_key, json.dumps(item_list))
logging.info("participant_user_id:%d set data done!" % participant_user_id)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
@shared_task
def sync_user_similar_score():
try:
results_items = UserSimilarScore.objects.filter(is_deleted=False).distinct().values("user_id").values_list(
"user_id", flat=True)
redis_key_prefix = "physical:user_similar_score:user_id:"
logging.info("begin sync user_similar_score!")
for user_id in results_items:
redis_key = redis_key_prefix + str(user_id)
similar_results_items = UserSimilarScore.objects.filter(is_deleted=False, user_id=user_id).order_by(
"-score")
item_list = list()
for item in similar_results_items:
contrast_user_id = item.contrast_user_id
score = item.score
item_list.append(
[contrast_user_id, score]
)
redis_client.set(redis_key, json.dumps(item_list))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())