Commit 006ce26d authored by Kai's avatar Kai

merge master

parents 79014755 28701dcf
......@@ -11,22 +11,26 @@ import traceback
from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar,UserSimilarScore
import json
from linucb.utils.register_user_tag import RegisterUserTag
@shared_task
def write_to_es(es_type, pk_list, use_batch_query_set=False):
try:
pk_list = list(frozenset(pk_list))
type_info_map = get_type_info_map()
type_info = type_info_map[es_type]
logging.info("consume es_type:%s" % str(es_type))
type_info.insert_table_by_pk_list(
sub_index_name=es_type,
pk_list=pk_list,
use_batch_query_set=use_batch_query_set,
es=ESPerform.get_cli()
)
if es_type == "register_user_tag":
RegisterUserTag.get_register_user_tag(pk_list)
else:
type_info_map = get_type_info_map()
type_info = type_info_map[es_type]
logging.info("consume es_type:%s" % str(es_type))
type_info.insert_table_by_pk_list(
sub_index_name=es_type,
pk_list=pk_list,
use_batch_query_set=use_batch_query_set,
es=ESPerform.get_cli()
)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -155,7 +155,7 @@ class ESPerform(object):
bulk_actions = []
if sub_index_name=="topic":
if sub_index_name=="topic" or sub_index_name=="topic-star-routing":
for data in data_list:
if data:
bulk_actions.append({
......
......@@ -28,54 +28,62 @@ class RegisterUserTag(object):
linucb_device_id_register_tag_topic_id_prefix = "physical:linucb:register_tag_topic_recommend:device_id:"
linucb_user_id_register_tag_topic_id_prefix = "physical:linucb:register_tag_topic_recommend:user_id:"
linucb_register_user_tag_key = "physical:linucb:register_user_tag_info"
@classmethod
def get_register_user_tag(cls,pk_list):
try:
user_id_set = set()
# user_id_set = set()
user_id_dict = dict()
query_results = AccountUserTag.objects.filter(pk__in=pk_list)
for item in query_results:
tag_id = item.tag_id
user_id = item.user
user_tag_list = AccountUserTag.objects.filter(user=user_id).values_list("tag_id", flat=True)
user_id_dict[user_id] = user_tag_list
if user_id not in user_id_set:
user_id_set.add(user_id)
user_tag_list = AccountUserTag.objects.filter(user=user_id).values_list("tag_id",flat=True)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(-1, user_id,
TopicPageType.HOME_RECOMMEND)
recommend_topic_id_list = list()
cycle_num = int(10000/len(user_tag_list))
for index in range(0,cycle_num):
for tag_id in user_tag_list:
redis_tag_id_key = cls.tag_topic_id_redis_prefix + str(tag_id)
redis_tag_id_data = redis_client.get(redis_tag_id_key)
tag_topic_id_list = json.loads(redis_tag_id_data) if redis_tag_id_data else []
if not redis_tag_id_data:
tag_topic_id_list = ESPerform.get_tag_topic_list(tag_id)
redis_client.set(redis_tag_id_key,json.dumps(tag_topic_id_list))
redis_client.expire(redis_tag_id_key,1*24*60*60)
if len(tag_topic_id_list)>index:
for topic_id in tag_topic_id_list[index:]:
if topic_id not in have_read_topic_id_list and topic_id not in recommend_topic_id_list:
recommend_topic_id_list.append(topic_id)
break
redis_register_tag_topic_data = {
"data": json.dumps(recommend_topic_id_list),
"cursor": 0
}
redis_client.hmset(cls.linucb_user_id_register_tag_topic_id_prefix,redis_register_tag_topic_data)
redis_client.expire(cls.linucb_user_id_register_tag_topic_id_prefix,30*24*60*60)
topic_recommend_redis_key = cls.linucb_user_id_recommend_topic_id_prefix + str(user_id)
redis_data_dict = {
"data": json.dumps(recommend_topic_id_list),
"cursor":0
}
redis_client.hmset(topic_recommend_redis_key,redis_data_dict)
redis_client.expire(topic_recommend_redis_key,30*24*60*60)
for user_id in user_id_dict:
redis_client.hset(cls.linucb_register_user_tag_key, user_id, json.dumps(list(user_id_dict[user_id])))
# if user_id not in user_id_set:
# user_id_set.add(user_id)
#
# user_tag_list = AccountUserTag.objects.filter(user=user_id).values_list("tag_id",flat=True)
#
# have_read_topic_id_list = Tools.get_have_read_topic_id_list(-1, user_id,
# TopicPageType.HOME_RECOMMEND)
# recommend_topic_id_list = list()
# cycle_num = int(10000/len(user_tag_list))
# for index in range(0,cycle_num):
# for tag_id in user_tag_list:
# redis_tag_id_key = cls.tag_topic_id_redis_prefix + str(tag_id)
# redis_tag_id_data = redis_client.get(redis_tag_id_key)
# tag_topic_id_list = json.loads(redis_tag_id_data) if redis_tag_id_data else []
# if not redis_tag_id_data:
# tag_topic_id_list = ESPerform.get_tag_topic_list(tag_id)
# redis_client.set(redis_tag_id_key,json.dumps(tag_topic_id_list))
# redis_client.expire(redis_tag_id_key,1*24*60*60)
#
# if len(tag_topic_id_list)>index:
# for topic_id in tag_topic_id_list[index:]:
# if topic_id not in have_read_topic_id_list and topic_id not in recommend_topic_id_list:
# recommend_topic_id_list.append(topic_id)
# break
#
# redis_register_tag_topic_data = {
# "data": json.dumps(recommend_topic_id_list),
# "cursor": 0
# }
# redis_client.hmset(cls.linucb_user_id_register_tag_topic_id_prefix,redis_register_tag_topic_data)
# redis_client.expire(cls.linucb_user_id_register_tag_topic_id_prefix,30*24*60*60)
#
# topic_recommend_redis_key = cls.linucb_user_id_recommend_topic_id_prefix + str(user_id)
# redis_data_dict = {
# "data": json.dumps(recommend_topic_id_list),
# "cursor":0
# }
# redis_client.hmset(topic_recommend_redis_key,redis_data_dict)
# redis_client.expire(topic_recommend_redis_key,30*24*60*60)
#
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -50,21 +50,21 @@ class CollectData(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return dict()
def update_recommend_tag_list(self, device_id,user_feature=None,click_topic_tag_list=None):
def update_recommend_tag_list(self, device_id,user_feature=None,user_id=None):
try:
recommend_tag_set = set()
recommend_tag_list = list()
recommend_tag_dict = dict()
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id)
if len(redis_linucb_tag_data_dict) == 0:
recommend_tag_list = LinUCB.get_default_tag_list()
recommend_tag_list = LinUCB.get_default_tag_list(user_id)
LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
else:
user_feature = user_feature if user_feature else self.user_feature
(recommend_tag_dict,recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))
recommend_tag_list = list(recommend_tag_dict.keys())
if len(recommend_tag_dict) > 0:
recommend_tag_list = list(recommend_tag_set)
if len(recommend_tag_list) > 0:
tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
# Todo:设置过期时间,调研set是否支持
......@@ -74,15 +74,8 @@ class CollectData(object):
have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,-1,TopicPageType.HOME_RECOMMEND)
recommend_topic_id_list = list()
if click_topic_tag_list:
recommend_topic_id_list_click = ESPerform.get_tag_topic_list(click_topic_tag_list,have_read_topic_id_list)
if len(recommend_topic_id_list) > 0:
num = min(len(recommend_topic_id_list),2)
recommend_topic_id_list.extend(recommend_topic_id_list_click[0:num])
tag_id_list = recommend_tag_list[0:100]
tag_topic_id_list = ESPerform.get_tag_topic_list(tag_id_list,have_read_topic_id_list)
recommend_topic_id_list.extend(tag_topic_id_list)
# for index in range(0,100):
# recommend_topic_id_list_es = list()
# for tag_id in recommend_tag_list[0:100]:
......@@ -111,7 +104,7 @@ class CollectData(object):
# recommend_topic_id_list.extend(recommend_topic_id_list_es)
topic_recommend_redis_key = self.linucb_recommend_topic_id_prefix + str(device_id)
redis_data_dict = {
"data": json.dumps(recommend_topic_id_list),
"data": json.dumps(tag_topic_id_list),
"cursor":0
}
redis_client.hmset(topic_recommend_redis_key,redis_data_dict)
......@@ -147,19 +140,18 @@ class CollectData(object):
if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]:
topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))
tag_list = list()
click_topic_tag_list = list()
click_sql_query_results = TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=topic_id).values_list("tag_id","is_online")
for tag_id,is_online in click_sql_query_results:
if is_online:
tag_list.append(tag_id)
tag_sql_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(id=tag_id).values_list("id","collection","is_ai")
for id,collection,is_ai in tag_sql_query_results:
if collection == 1 or is_ai == 1:
click_topic_tag_list.append(id)
# tag_sql_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(id=tag_id).values_list("id","collection","is_ai")
# for id,collection,is_ai in tag_sql_query_results:
# if collection == 1 or is_ai == 1:
tag_list.append(tag_id)
is_click = 1
is_vote = 0
......@@ -172,7 +164,7 @@ class CollectData(object):
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature,click_topic_tag_list)
self.update_recommend_tag_list(device_id, user_feature, user_id)
elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["exposure_cards"],str):
exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
......@@ -181,6 +173,7 @@ class CollectData(object):
else:
exposure_cards_list = list()
device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
exposure_topic_id_list = list()
for item in exposure_cards_list:
......@@ -218,7 +211,7 @@ class CollectData(object):
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
self.update_recommend_tag_list(device_id, user_feature, user_id)
else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
......
......@@ -20,11 +20,16 @@ class LinUCB:
default_tag_list = list()
@classmethod
def get_default_tag_list(cls):
def get_default_tag_list(cls,user_id):
try:
if len(cls.default_tag_list) == 0:
cls.default_tag_list = Tag.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True,collection=1).values_list("id",flat=True)[0:100]
if user_id:
redis_tag_data = redis_client.hget("physical:linucb:register_user_tag_info", user_id)
cls.default_tag_list = json.loads(redis_tag_data) if redis_tag_data else []
if len(cls.default_tag_list) == 0:
cls.default_tag_list = Tag.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True,collection=1).values_list("id",flat=True)[0:100]
return cls.default_tag_list
except:
......
......@@ -122,7 +122,7 @@ class TopicUtils(object):
@classmethod
def get_recommend_topic_ids(cls,user_id,tag_id,offset,size,single_size,query=None,query_type=TopicPageType.FIND_PAGE,
filter_topic_id_list=[],test_score=False,must_topic_id_list=[],recommend_tag_list=[],
user_similar_score_list=[],index_type="topic",routing=None):
user_similar_score_list=[],index_type="topic",routing=None,attention_tag_list=[]):
"""
:需增加打散逻辑
:remark:获取首页推荐帖子列表
......@@ -216,12 +216,12 @@ class TopicUtils(object):
# )
# query_tag_term_list = cls.___get_should_term_list(user_tag_list)
if len(user_tag_list) > 0:
if len(attention_tag_list) > 0:
functions_list.append(
{
"filter": {"bool": {
"should": {"terms": {"tag_list": user_tag_list}}}},
"weight": 1
"should": {"terms": {"tag_list": attention_tag_list}}}},
"weight": 60
}
)
# if len(recommend_tag_list)>0:
......
......@@ -90,12 +90,16 @@ def get_home_recommend_topic_ids(user_id, device_id, tag_id, offset, size, query
user_similar_score_redis_list = json.loads(
redis_user_similar_score_redis_val) if redis_user_similar_score_redis_val else []
redis_tag_data = redis_client.hget("physical:linucb:register_user_tag_info", user_id)
attention_tag_list = json.loads(redis_tag_data) if redis_tag_data else []
size = size-len(recommend_topic_list)
topic_id_list = TopicUtils.get_recommend_topic_ids(user_id=user_id, tag_id=tag_id, offset=offset, size=size,
single_size=size,query=query, query_type=query_type,
filter_topic_id_list=have_read_topic_id_list,
recommend_tag_list=recommend_topic_list,
user_similar_score_list=user_similar_score_redis_list,index_type="topic",routing="4,5")
user_similar_score_list=user_similar_score_redis_list,index_type="topic",routing="4,5",attention_tag_list=attention_tag_list)
have_read_group_id_set = set()
have_read_user_id_set = set()
unread_topic_id_dict = dict()
......
{
"dynamic":"strict",
"_routing": {"required": true},
"properties": {
"id":{"type":"long"},
"is_online":{"type":"boolean"},//上线
......
{
"dynamic":"strict",
"_routing": {"required": true},
"properties": {
"id":{"type":"long"},
"is_online":{"type":"boolean"},//上线
......
......@@ -75,12 +75,19 @@ class Pictorial(models.Model):
def get_effective(self,topic_id_list):
try:
topic_id_list = Topic.objects.filter(id__in=topic_id_list,content_level__in=[3,4,5,0],is_online=True).count()
if topic_id_list >= 5:
return True
else:
return False
effective_num = 0
ret = False
for topic_id in topic_id_list:
topic_id_object = Topic.objects.filter(id=int(topic_id)).first()
if topic_id_object and topic_id_object.is_online and int(topic_id_object.content_level) in [0,3,4,5]:
effective_num += 1
if effective_num >= 5:
ret = True
break
return ret
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
......
......@@ -107,11 +107,14 @@ class Topic(models.Model):
@property
def is_complaint(self):
"""是否被举报"""
try:
if TopicComplaint.objects.filter(topic_id=self.id, is_online=True).exists():
return True
if TopicComplaint.objects.filter(topic_id=self.id, is_online=True).exists():
return True
return False
return False
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
def topic_has_image(self):
try:
......@@ -197,14 +200,14 @@ class Topic(models.Model):
elif self.content_level == '3':
offline_score += 2.0
exposure_count = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=1).count()
click_count = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=2).count()
uv_num = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=3).count()
if exposure_count > 0:
offline_score += click_count / exposure_count
if uv_num > 0:
offline_score += (self.vote_num / uv_num + self.reply_num / uv_num)
# exposure_count = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=1).count()
# click_count = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=2).count()
# uv_num = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=3).count()
#
# if exposure_count > 0:
# offline_score += click_count / exposure_count
# if uv_num > 0:
# offline_score += (self.vote_num / uv_num + self.reply_num / uv_num)
"""
1:马甲账号是否对总分降权?
......@@ -247,7 +250,6 @@ class PictorialTopic(models.Model):
pictorial_id = models.BigIntegerField(verbose_name=u'画报ID')
topic_id = models.BigIntegerField(verbose_name=u'帖子ID')
is_online = models.BooleanField(verbose_name=u"是否有效", default=True)
is_online = models.BooleanField(verbose_name=u'是否上线')
is_deleted = models.BooleanField(verbose_name=u'是否删除')
......
......@@ -143,8 +143,6 @@ class TypeInfo(object):
))
else:
if data:
data_list.append(data)
if self.type == "topic":
ori_topic_star = redis_client.hget(self.physical_topic_star, data["id"])
if not ori_topic_star:
......@@ -155,10 +153,11 @@ class TypeInfo(object):
old_data = copy.deepcopy(data)
old_data["is_online"] = False
old_data["is_deleted"] = True
old_data["content_level"] = ori_topic_star
old_data["content_level"] = int_ori_topic_star
data_list.append(old_data)
redis_client.hset(self.physical_topic_star, data["id"], data["content_level"])
data_list.append(data)
# if self.type=="topic" and instance.content_level and int(instance.content_level)>=4:
# topic_data_high_star_list.append(data)
......
......@@ -115,8 +115,8 @@ class TopicTransfer(object):
if is_excellect:
res["is_excellent"] = 1
else:
excelllect_object = ExcellentTopic.objects.filter(topic_id=instance.id,is_online=True,is_deleted=False).first()
if excelllect_object:
excelllect_object = ExcellentTopic.objects.filter(topic_id=instance.id).first()
if excelllect_object and excelllect_object.is_online and not excelllect_object.is_deleted:
res["is_excellent"] = 1
else:
res["is_excellent"] = 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment