Commit fafb0d5c authored by 段英荣's avatar 段英荣

add topic-routing

parent 2d47821b
...@@ -183,7 +183,7 @@ class ESPerform(object): ...@@ -183,7 +183,7 @@ class ESPerform(object):
@classmethod @classmethod
def get_search_results(cls, es_cli,sub_index_name,query_body,offset=0,size=10, def get_search_results(cls, es_cli,sub_index_name,query_body,offset=0,size=10,
auto_create_index=False,doc_type="_doc",aggregations_query=False,is_suggest_request=False,batch_search=False): auto_create_index=False,doc_type="_doc",aggregations_query=False,is_suggest_request=False,batch_search=False,routing=None):
try: try:
assert (es_cli is not None) assert (es_cli is not None)
...@@ -200,7 +200,11 @@ class ESPerform(object): ...@@ -200,7 +200,11 @@ class ESPerform(object):
logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8")) logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8"))
if not batch_search: if not batch_search:
if not routing:
res = es_cli.search(index=official_index_name,doc_type=doc_type,body=query_body,from_=offset,size=size) res = es_cli.search(index=official_index_name,doc_type=doc_type,body=query_body,from_=offset,size=size)
else:
res = es_cli.search(index=official_index_name, doc_type=doc_type, body=query_body, from_=offset,
size=size,routing=routing)
if is_suggest_request: if is_suggest_request:
return res return res
...@@ -256,3 +260,33 @@ class ESPerform(object): ...@@ -256,3 +260,33 @@ class ESPerform(object):
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return True return True
@classmethod
def get_tag_topic_list(cls,tag_id):
try:
q = {
"query":{
"bool":{
"must":[
{"term":{"is_online": True}},
{"term":{"is_deleted": False}},
{"term":{"tag_list":tag_id}}
]
}
},
"_source":{
"include":["id"]
},
"sort":[
{"create_time_val":{"order":"desc"}},
{"language_type":{"order":"asc"}},
]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q,
offset=0, size=5000,routing="4,5")
topic_id_list = [item["_source"]["id"] for item in result_dict["hits"]]
return topic_id_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list()
...@@ -4,6 +4,10 @@ ...@@ -4,6 +4,10 @@
from django.conf import settings from django.conf import settings
from pytz import timezone from pytz import timezone
from datetime import datetime from datetime import datetime
import traceback
from libs.cache import redis_client
import json
import logging
def tzlc(dt, truncate_to_sec=True): def tzlc(dt, truncate_to_sec=True):
...@@ -16,3 +20,24 @@ def tzlc(dt, truncate_to_sec=True): ...@@ -16,3 +20,24 @@ def tzlc(dt, truncate_to_sec=True):
return timezone(settings.TIME_ZONE).localize(dt) return timezone(settings.TIME_ZONE).localize(dt)
else: else:
return timezone(settings.TIME_ZONE).normalize(dt) return timezone(settings.TIME_ZONE).normalize(dt)
def get_have_read_topic_id_list(device_id,user_id,query_type):
try:
if user_id==-1:
redis_key = "physical:home_recommend" + ":device_id:" + str(device_id) + ":query_type:" + str(query_type)
else:
redis_key = "physical:home_recommend" + ":user_id:" + str(user_id) + ":query_type:" + str(query_type)
have_read_topic_id_list = list()
redis_field_list = [b'have_read_topic_list']
redis_field_val_list = redis_client.hmget(redis_key, redis_field_list)
if redis_field_val_list[0]:
have_read_topic_id_list = list(json.loads(redis_field_val_list[0]))
return have_read_topic_id_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list()
\ No newline at end of file
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
import numpy as np
import redis
from libs.cache import redis_client
import logging
import traceback
import json
import pickle
from django.conf import settings
from trans2es.models.tag import AccountUserTag
from libs.es import ESPerform
import libs.tools as Tools
from search.utils.common import *
class RegisterUserTag(object):
linucb_device_id_matrix_redis_prefix = "physical:linucb:device_id:"
linucb_device_id_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
linucb_device_id_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:"
tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:"
linucb_user_id_matrix_redis_prefix = "physical:linucb:user_id:"
linucb_user_id_recommend_redis_prefix = "physical:linucb:tag_recommend:user_id:"
linucb_user_id_recommend_topic_id_prefix = "physical:linucb:topic_recommend:user_id:"
linucb_device_id_register_tag_topic_id_prefix = "physical:linucb:register_tag_topic_recommend:device_id:"
linucb_user_id_register_tag_topic_id_prefix = "physical:linucb:register_tag_topic_recommend:user_id:"
@classmethod
def get_register_user_tag(cls,pk_list):
try:
user_id_set = set()
query_results = AccountUserTag.objects.filter(pk__in=pk_list)
for item in query_results:
tag_id = item.tag_id
user_id = item.user
if user_id not in user_id_set:
user_id_set.add(user_id)
user_tag_list = AccountUserTag.objects.filter(user=user_id).values_list("tag_id",flat=True)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(-1, user_id,
TopicPageType.HOME_RECOMMEND)
recommend_topic_id_list = list()
cycle_num = int(10000/len(user_tag_list))
for index in range(0,cycle_num):
for tag_id in user_tag_list:
redis_tag_id_key = cls.tag_topic_id_redis_prefix + str(tag_id)
redis_tag_id_data = redis_client.get(redis_tag_id_key)
tag_topic_id_list = json.loads(redis_tag_id_data) if redis_tag_id_data else []
if not redis_tag_id_data:
tag_topic_id_list = ESPerform.get_tag_topic_list(tag_id)
redis_client.set(redis_tag_id_key,json.dumps(tag_topic_id_list))
redis_client.expire(redis_tag_id_key,1*24*60*60)
if len(tag_topic_id_list)>index:
for topic_id in tag_topic_id_list[index:]:
if topic_id not in have_read_topic_id_list and topic_id not in recommend_topic_id_list:
recommend_topic_id_list.append(topic_id)
break
redis_register_tag_topic_data = {
"data": json.dumps(recommend_topic_id_list),
"cursor": 0
}
redis_client.hmset(cls.linucb_user_id_register_tag_topic_id_prefix,redis_register_tag_topic_data)
redis_client.expire(cls.linucb_user_id_register_tag_topic_id_prefix,30*24*60*60)
topic_recommend_redis_key = cls.linucb_user_id_recommend_topic_id_prefix + str(user_id)
redis_data_dict = {
"data": json.dumps(recommend_topic_id_list),
"cursor":0
}
redis_client.hmset(topic_recommend_redis_key,redis_data_dict)
redis_client.expire(topic_recommend_redis_key,30*24*60*60)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
import numpy as np
import redis
from libs.cache import redis_client
import logging
import traceback
import json
import pickle
from django.conf import settings
from trans2es.models.tag import CommunityTagFollow
from libs.es import ESPerform
import libs.tools as Tools
from search.utils.common import *
class UserAttentionTag(object):
linucb_device_id_matrix_redis_prefix = "physical:linucb:device_id:"
linucb_device_id_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
linucb_device_id_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:"
tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:"
linucb_user_id_matrix_redis_prefix = "physical:linucb:user_id:"
linucb_user_id_recommend_redis_prefix = "physical:linucb:tag_recommend:user_id:"
linucb_user_id_recommend_topic_id_prefix = "physical:linucb:topic_recommend:user_id:"
linucb_user_id_attention_tag_topic_id_prefix = "physical:linucb:attention_tag_topic_recommend:user_id:"
@classmethod
def get_register_user_tag(cls,pk_list):
try:
user_id_set = set()
query_results = CommunityTagFollow.objects.filter(pk__in=pk_list)
for item in query_results:
tag_id = item.tag_id
user_id = item.user_id
if user_id not in user_id_set:
user_id_set.add(user_id)
user_tag_list = CommunityTagFollow.objects.filter(user_id=user_id).order_by("-create_time").values_list("tag_id",flat=True)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(-1, user_id,
TopicPageType.HOME_RECOMMEND)
recommend_topic_id_list = list()
cycle_num = int(10000/len(user_tag_list))
for index in range(0,cycle_num):
for tag_id in user_tag_list:
redis_tag_id_key = cls.tag_topic_id_redis_prefix + str(tag_id)
redis_tag_id_data = redis_client.get(redis_tag_id_key)
tag_topic_id_list = json.loads(redis_tag_id_data) if redis_tag_id_data else []
if not redis_tag_id_data:
tag_topic_id_list = ESPerform.get_tag_topic_list(tag_id)
redis_client.set(redis_tag_id_key,json.dumps(tag_topic_id_list))
redis_client.expire(redis_tag_id_key,1*24*60*60)
if len(tag_topic_id_list)>index:
for topic_id in tag_topic_id_list[index:]:
if topic_id not in have_read_topic_id_list and topic_id not in recommend_topic_id_list:
recommend_topic_id_list.append(topic_id)
break
redis_register_tag_topic_data = {
"data": json.dumps(recommend_topic_id_list),
"cursor": 0
}
redis_client.hmset(cls.linucb_user_id_attention_tag_topic_id_prefix,redis_register_tag_topic_data)
redis_client.expire(cls.linucb_user_id_attention_tag_topic_id_prefix,30*24*60*60)
topic_recommend_redis_key = cls.linucb_user_id_recommend_topic_id_prefix + str(user_id)
redis_recommend_topic_dict = redis_client.hgetall(topic_recommend_redis_key)
if len(redis_recommend_topic_dict)==0:
redis_data_dict = {
"data": json.dumps(recommend_topic_id_list),
"cursor":0
}
redis_client.hmset(topic_recommend_redis_key,redis_data_dict)
redis_client.expire(topic_recommend_redis_key,30*24*60*60)
else:
ori_recommend_topic_id_list = json.loads(redis_recommend_topic_dict["data"])
ori_recommend_cursor = redis_recommend_topic_dict["cursor"]
ori_index = 0
for new_recommend_index in range(0,len(recommend_topic_id_list),2):
pass
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
...@@ -11,7 +11,7 @@ import traceback ...@@ -11,7 +11,7 @@ import traceback
from django.conf import settings from django.conf import settings
from libs.es import ESPerform from libs.es import ESPerform
from search.utils.common import * from search.utils.common import *
import libs.tools as Tools
class KafkaManager(object): class KafkaManager(object):
consumser_obj = None consumser_obj = None
...@@ -49,35 +49,6 @@ class CollectData(object): ...@@ -49,35 +49,6 @@ class CollectData(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return dict() return dict()
def get_tag_topic_list(self,tag_id):
try:
q = {
"query":{
"bool":{
"must":[
{"term":{"is_online": True}},
{"term":{"is_deleted": False}},
{"term":{"tag_list":tag_id}}
]
}
},
"_source":{
"include":["id"]
},
"sort":[
{"create_time_val":{"order":"desc"}},
{"language_type":{"order":"asc"}},
]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic-high-star", query_body=q,
offset=0, size=5000)
topic_id_list = [item["_source"]["id"] for item in result_dict["hits"]]
return topic_id_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list()
def update_recommend_tag_list(self, device_id,user_feature=None): def update_recommend_tag_list(self, device_id,user_feature=None):
try: try:
recommend_tag_set = set() recommend_tag_set = set()
...@@ -99,14 +70,7 @@ class CollectData(object): ...@@ -99,14 +70,7 @@ class CollectData(object):
redis_client.expire(tag_recommend_redis_key, 7*24*60*60) redis_client.expire(tag_recommend_redis_key, 7*24*60*60)
redis_key = "physical:home_recommend" + ":device_id:" + device_id + ":query_type:" + str(TopicPageType.HOME_RECOMMEND) have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,-1,TopicPageType.HOME_RECOMMEND)
have_read_topic_id_list = list()
redis_field_list = [b'have_read_topic_list']
redis_field_val_list = redis_client.hmget(redis_key, redis_field_list)
if redis_field_val_list[0]:
have_read_topic_id_list = list(json.loads(redis_field_val_list[0]))
recommend_topic_id_list = list() recommend_topic_id_list = list()
for index in range(0,1000): for index in range(0,1000):
...@@ -115,7 +79,7 @@ class CollectData(object): ...@@ -115,7 +79,7 @@ class CollectData(object):
redis_tag_id_data = redis_client.get(redis_tag_id_key) redis_tag_id_data = redis_client.get(redis_tag_id_key)
tag_topic_id_list = json.loads(redis_tag_id_data) if redis_tag_id_data else [] tag_topic_id_list = json.loads(redis_tag_id_data) if redis_tag_id_data else []
if not redis_tag_id_data: if not redis_tag_id_data:
tag_topic_id_list = self.get_tag_topic_list(tag_id) tag_topic_id_list = ESPerform.get_tag_topic_list(tag_id)
redis_client.set(redis_tag_id_key,json.dumps(tag_topic_id_list)) redis_client.set(redis_tag_id_key,json.dumps(tag_topic_id_list))
redis_client.expire(redis_tag_id_key,1*24*60*60) redis_client.expire(redis_tag_id_key,1*24*60*60)
......
...@@ -122,7 +122,7 @@ class TopicUtils(object): ...@@ -122,7 +122,7 @@ class TopicUtils(object):
@classmethod @classmethod
def get_recommend_topic_ids(cls,user_id,tag_id,offset,size,single_size,query=None,query_type=TopicPageType.FIND_PAGE, def get_recommend_topic_ids(cls,user_id,tag_id,offset,size,single_size,query=None,query_type=TopicPageType.FIND_PAGE,
filter_topic_id_list=[],test_score=False,must_topic_id_list=[],recommend_tag_list=[], filter_topic_id_list=[],test_score=False,must_topic_id_list=[],recommend_tag_list=[],
user_similar_score_list=[],index_type="topic"): user_similar_score_list=[],index_type="topic",routing=None):
""" """
:需增加打散逻辑 :需增加打散逻辑
:remark:获取首页推荐帖子列表 :remark:获取首页推荐帖子列表
...@@ -347,7 +347,7 @@ class TopicUtils(object): ...@@ -347,7 +347,7 @@ class TopicUtils(object):
"_score" "_score"
] ]
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name=index_type, query_body=q, result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name=index_type, query_body=q,
offset=offset, size=size) offset=offset, size=size,routing=routing)
topic_id_list = list() topic_id_list = list()
same_group_id_set = set() same_group_id_set = set()
...@@ -388,7 +388,7 @@ class TopicUtils(object): ...@@ -388,7 +388,7 @@ class TopicUtils(object):
@classmethod @classmethod
def get_topic_detail_recommend_list(cls, user_id, topic_id, topic_tag_list, topic_pictorial_id, topic_user_id, def get_topic_detail_recommend_list(cls, user_id, topic_id, topic_tag_list, topic_pictorial_id, topic_user_id,
filter_topic_user_id, have_read_topic_list, offset, size, es_cli_obj=None,index_type="topic"): filter_topic_user_id, have_read_topic_list, offset, size, es_cli_obj=None,index_type="topic",routing=None):
""" """
:remark 帖子详情页推荐列表,缺少按时间衰减 :remark 帖子详情页推荐列表,缺少按时间衰减
:param user_id: :param user_id:
...@@ -465,7 +465,7 @@ class TopicUtils(object): ...@@ -465,7 +465,7 @@ class TopicUtils(object):
} }
result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name=index_type, query_body=q, result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name=index_type, query_body=q,
offset=offset, size=size) offset=offset, size=size,routing=routing)
return result_dict["hits"] return result_dict["hits"]
except: except:
......
...@@ -27,7 +27,7 @@ def get_discover_page_topic_ids(user_id, device_id, size, query_type=TopicPageTy ...@@ -27,7 +27,7 @@ def get_discover_page_topic_ids(user_id, device_id, size, query_type=TopicPageTy
recommend_topic_ids = TopicUtils.get_recommend_topic_ids(user_id=user_id, tag_id=0, offset=0, size=size,single_size=size, recommend_topic_ids = TopicUtils.get_recommend_topic_ids(user_id=user_id, tag_id=0, offset=0, size=size,single_size=size,
query_type=query_type, query_type=query_type,
filter_topic_id_list=have_read_topic_id_list,index_type="topic-high-star") filter_topic_id_list=have_read_topic_id_list,index_type="topic",routing="4,5")
have_read_topic_id_list.extend(recommend_topic_ids) have_read_topic_id_list.extend(recommend_topic_ids)
...@@ -95,7 +95,7 @@ def get_home_recommend_topic_ids(user_id, device_id, tag_id, offset, size, query ...@@ -95,7 +95,7 @@ def get_home_recommend_topic_ids(user_id, device_id, tag_id, offset, size, query
single_size=size,query=query, query_type=query_type, single_size=size,query=query, query_type=query_type,
filter_topic_id_list=have_read_topic_id_list, filter_topic_id_list=have_read_topic_id_list,
recommend_tag_list=recommend_topic_list, recommend_tag_list=recommend_topic_list,
user_similar_score_list=user_similar_score_redis_list,index_type="topic-high-star") user_similar_score_list=user_similar_score_redis_list,index_type="topic",routing="4,5")
have_read_group_id_set = set() have_read_group_id_set = set()
have_read_user_id_set = set() have_read_user_id_set = set()
unread_topic_id_dict = dict() unread_topic_id_dict = dict()
...@@ -285,7 +285,7 @@ def topic_detail_page_recommend(device_id="", user_id=-1, topic_id=-1, topic_pic ...@@ -285,7 +285,7 @@ def topic_detail_page_recommend(device_id="", user_id=-1, topic_id=-1, topic_pic
result_list = TopicUtils.get_topic_detail_recommend_list(user_id, topic_id, topic_tag_list, topic_pictorial_id, result_list = TopicUtils.get_topic_detail_recommend_list(user_id, topic_id, topic_tag_list, topic_pictorial_id,
topic_user_id, filter_topic_user_id, topic_user_id, filter_topic_user_id,
have_read_topic_list, offset, size, es_cli_obj,index_type="topic-high-star") have_read_topic_list, offset, size, es_cli_obj,index_type="topic",routing="4,5")
recommend_topic_ids_list = list() recommend_topic_ids_list = list()
if len(result_list) > 0: if len(result_list) > 0:
recommend_topic_ids_list = [item["_source"]["id"] for item in result_list] recommend_topic_ids_list = [item["_source"]["id"] for item in result_list]
...@@ -360,7 +360,7 @@ def query_topic_by_user_similarity(topic_similarity_score_dict, offset=0, size=1 ...@@ -360,7 +360,7 @@ def query_topic_by_user_similarity(topic_similarity_score_dict, offset=0, size=1
must_topic_id_list = list(topic_similarity_score_dict.keys()) must_topic_id_list = list(topic_similarity_score_dict.keys())
topic_id_list = TopicUtils.get_recommend_topic_ids(tag_id=0, user_id=-1, offset=offset, size=size,single_size=size, topic_id_list = TopicUtils.get_recommend_topic_ids(tag_id=0, user_id=-1, offset=offset, size=size,single_size=size,
must_topic_id_list=must_topic_id_list,index_type="topic-high-star") must_topic_id_list=must_topic_id_list,index_type="topic",routing="4,5")
return {"recommend_topic_ids": topic_id_list} return {"recommend_topic_ids": topic_id_list}
except: except:
......
...@@ -52,6 +52,19 @@ class Tag(models.Model): ...@@ -52,6 +52,19 @@ class Tag(models.Model):
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0)) update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
class CommunityTagFollow(models.Model):
class Meta:
verbose_name=u"用户关注标签"
db_table="community_tag_follow"
id = models.IntegerField(primary_key=True,verbose_name=u"主键ID")
is_deleted = models.BooleanField(verbose_name=u"是否删除")
is_online = models.BooleanField(verbose_name=u"是否上线")
user_id = models.IntegerField(verbose_name=u"用户ID")
tag_id = models.IntegerField(verbose_name=u"帖子ID")
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
class CommunityTagTypeRelation(models.Model): class CommunityTagTypeRelation(models.Model):
class Meta: class Meta:
verbose_name=u"标签类型对应关系" verbose_name=u"标签类型对应关系"
......
...@@ -389,7 +389,16 @@ def get_type_info_map(): ...@@ -389,7 +389,16 @@ def get_type_info_map():
round_insert_chunk_size=5, round_insert_chunk_size=5,
round_insert_period=2, round_insert_period=2,
) )
# TypeInfo(
# name="account_user_tag", # 用户标签
# type="account_user_tag",
# model=pictorial.Pictorial,
# query_deferred=lambda: pictorial.Pictorial.objects.all().query,
# get_data_func=PictorialTransfer.get_poctorial_data,
# bulk_insert_chunk_size=100,
# round_insert_chunk_size=5,
# round_insert_period=2,
# )
] ]
type_info_map = { type_info_map = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment