Commit 1ea19ffd authored by 赵威's avatar 赵威

Merge branch 'master' into offic

parents aa22891a ab172f78
......@@ -7,7 +7,7 @@ from libs.cache import redis_client
import logging
from linucb.views.linucb import LinUCB
import json
from trans2es.models.tag import TopicTag,Tag
from trans2es.models.tag import TopicTag, Tag
from trans2es.models.topic import TopicHomeRecommend
import traceback
from django.conf import settings
......@@ -26,11 +26,11 @@ def loads_data(data):
try:
result = json.loads(data)
msg = True
return result,msg
return result, msg
except:
result = msgpack.loads(data)
msg = False
return result,msg
return result, msg
class KafkaManager(object):
......@@ -38,7 +38,6 @@ class KafkaManager(object):
@classmethod
def get_kafka_consumer_ins(cls, topic_name=None):
if not cls.consumser_obj:
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
gm_logging_name = settings.KAFKA_GM_LOGGING_TOPIC_NAME
......@@ -47,6 +46,7 @@ class KafkaManager(object):
return cls.consumser_obj
class CollectData(object):
def __init__(self):
......@@ -68,7 +68,6 @@ class CollectData(object):
# 默认
self.user_feature = [0, 1]
def _get_user_linucb_info(self, device_id, linucb_matrix_prefix):
try:
redis_key = linucb_matrix_prefix + str(device_id)
......@@ -83,7 +82,8 @@ class CollectData(object):
return dict()
def update_recommend_tag_list(self, device_id, user_feature=None, user_id=None, click_topic_tag_list=None,
new_user_click_tag_list=[], linucb_matrix_prefix=None, linucb_recommend_tag_prefix=None,
new_user_click_tag_list=[], linucb_matrix_prefix=None,
linucb_recommend_tag_prefix=None,
linucb_topic_ids_prefix=None, linucb_pictorial_ids_prefix=None):
try:
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id, linucb_matrix_prefix)
......@@ -101,13 +101,16 @@ class CollectData(object):
if len(recommend_tag_list) > 0:
tag_recommend_redis_key = linucb_recommend_tag_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
redis_client.expire(tag_recommend_redis_key, 30*24*60*60)
redis_client.expire(tag_recommend_redis_key, 30 * 24 * 60 * 60)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id, user_id,
TopicPageType.HOME_RECOMMEND)
have_read_lin_pictorial_id_list = Tools.get_have_read_lin_pictorial_id_list(device_id, user_id,
TopicPageType.HOME_RECOMMEND)
promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE1_DB_NAME).filter(is_online=1).values_list("topic_id",flat=True)
promote_lin_pictorial_id_list = CommunityPictorialHomeFeed.objects.using(settings.SLAVE1_DB_NAME).filter(
promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE1_DB_NAME).filter(
is_online=1).values_list("topic_id", flat=True)
promote_lin_pictorial_id_list = CommunityPictorialHomeFeed.objects.using(
settings.SLAVE1_DB_NAME).filter(
is_deleted=0, is_online=1).values_list("pictorial_id", flat=True)
have_read_topic_id_list.extend(promote_recommend_topic_id_list)
have_read_lin_pictorial_id_list.extend(promote_lin_pictorial_id_list)
......@@ -118,10 +121,11 @@ class CollectData(object):
recommend_topic_id_list_click_dict = dict()
recommend_lin_pictorial_id_list = list()
if click_topic_tag_list and len(click_topic_tag_list)>0:
if click_topic_tag_list and len(click_topic_tag_list) > 0:
click_topic_tag_list_same_tagset_ids = get_same_tagset_ids(click_topic_tag_list)
recommend_topic_id_list_click,recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(click_topic_tag_list_same_tagset_ids,
have_read_topic_id_list,size=2)
recommend_topic_id_list_click, recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(
click_topic_tag_list_same_tagset_ids,
have_read_topic_id_list, size=2)
if len(recommend_topic_id_list_click) > 0:
recommend_topic_id_list.extend(recommend_topic_id_list_click)
recommend_topic_id_list_dict.update(recommend_topic_id_list_click_dict)
......@@ -145,29 +149,31 @@ class CollectData(object):
# b"data"] else []
# cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8"))
# if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0:
# have_read_topic_id_list.extend(redis_topic_list[:2])
if len(new_user_click_tag_list)>0:
if len(new_user_click_tag_list) > 0:
new_user_click_tag_list_same_tagset_ids = get_same_tagset_ids(new_user_click_tag_list)
tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(new_user_click_tag_list_same_tagset_ids, have_read_topic_id_list)
recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(new_user_click_tag_list_same_tagset_ids,
tag_topic_id_list, tag_topic_dict = ESPerform.get_tag_topic_list_dict(
new_user_click_tag_list_same_tagset_ids, have_read_topic_id_list)
recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(
new_user_click_tag_list_same_tagset_ids,
have_read_lin_pictorial_id_list)
else:
tag_id_list_same_tagset_ids = get_same_tagset_ids(tag_id_list)
tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list_same_tagset_ids,have_read_topic_id_list)
tag_topic_id_list, tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list_same_tagset_ids,
have_read_topic_id_list)
recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(tag_id_list_same_tagset_ids,
have_read_lin_pictorial_id_list)
if len(recommend_topic_id_list)>0 or len(tag_topic_id_list)>0 or len(new_user_click_tag_list) > 0:
if len(recommend_topic_id_list) > 0 or len(tag_topic_id_list) > 0 or len(new_user_click_tag_list) > 0:
tag_topic_id_list = recommend_topic_id_list + tag_topic_id_list
tag_topic_dict.update(recommend_topic_id_list_dict)
redis_data_dict = {
"data": json.dumps(tag_topic_id_list),
"datadict":json.dumps(tag_topic_dict),
"cursor":0
"datadict": json.dumps(tag_topic_dict),
"cursor": 0
}
redis_client.hmset(topic_recommend_redis_key,redis_data_dict)
redis_client.hmset(topic_recommend_redis_key, redis_data_dict)
if len(recommend_lin_pictorial_id_list) > 0:
pictorial_data_dict = {
"data": json.dumps(recommend_lin_pictorial_id_list),
......@@ -184,19 +190,19 @@ class CollectData(object):
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature, linucb_matrix_redis_prefix):
try:
user_feature = user_feature if user_feature else self.user_feature
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id, linucb_matrix_redis_prefix, redis_client)
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id, linucb_matrix_redis_prefix,
redis_client)
except:
logging_exception()
logging.error("update_user_linucb_tag_info error!")
return False
def transfer_old_info2ctr_feature_key(self, device_id):
try:
# 移植老用户的lin标签参数信息到ctr特征策略
ctr_linucb_matrix_redis_prefix_key = self.ctr_linucb_matrix_redis_prefix + str(device_id)
linucb_matrix_redis_prefix_key = self.linucb_matrix_redis_prefix + str(device_id)
if redis_client.exists(ctr_linucb_matrix_redis_prefix_key): #如果新策略存在lin信息,则不需要移植
if redis_client.exists(ctr_linucb_matrix_redis_prefix_key): # 如果新策略存在lin信息,则不需要移植
return True
else:
if redis_client.exists(linucb_matrix_redis_prefix_key):
......@@ -223,7 +229,7 @@ class CollectData(object):
if redis_client.exists(linucb_recommend_pictorial_id_prefix):
older_device_info = redis_client.hgetall(linucb_recommend_pictorial_id_prefix)
redis_client.hmset(ctr_linucb_recommend_pictorial_id_prefix, older_device_info)
logging.info("transfer_old_info2ctr_feature_key sucess:"+str(device_id))
logging.info("transfer_old_info2ctr_feature_key sucess:" + str(device_id))
return True
except:
logging_exception()
......@@ -246,7 +252,41 @@ class CollectData(object):
logging.error("get_device_tag_ctr error!")
return 0.001
def consume_data_from_kafka(self,topic_name=None):
# 用户打标签加分
# 新增四种用户兴趣分行为
# 四种日志均为后端埋点日志
def transfer_update_recommend_tag_list(self, device_id, user_feature, user_id, tag_list, score_loop=1):
if len(tag_list) > 0:
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
for i in range(score_loop):
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature_ctr, self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=tag_list,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=tag_list,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
def consume_data_from_kafka(self, topic_name=None):
try:
user_feature = [1, 1]
......@@ -257,11 +297,11 @@ class CollectData(object):
consume_msg = msg_dict[msg_key]
for ori_msg in consume_msg:
try:
raw_val_dict,msg = loads_data(ori_msg.value)
raw_val_dict, msg = loads_data(ori_msg.value)
if msg:
logging.info(ori_msg.value)
if "type" in raw_val_dict and \
(raw_val_dict["type"] in ("on_click_feed_topic_card","on_click_button")):
(raw_val_dict["type"] in ("on_click_feed_topic_card", "on_click_button")):
click_topic_tag_list = list()
device_id = ""
if "on_click_feed_topic_card" == raw_val_dict["type"]:
......@@ -283,7 +323,8 @@ class CollectData(object):
if is_collection:
topic_tag_list.append(tag_id)
tag_query_results = Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=topic_tag_list, is_online=True, is_deleted=False, is_category=False).values_list("id",
id__in=topic_tag_list, is_online=True, is_deleted=False,
is_category=False).values_list("id",
"is_ai")
for id, is_ai in tag_query_results:
click_topic_tag_list.append(id)
......@@ -298,13 +339,17 @@ class CollectData(object):
tag_name = raw_val_dict["params"]["extra_param"]
device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(name=tag_name,is_online=True,is_deleted=False, is_category=False).values_list("id",flat=True))
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(name=tag_name,
is_online=True,
is_deleted=False,
is_category=False).values_list(
"id", flat=True))
click_topic_tag_list.extend(tag_list)
logging.info("query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % (
logging.info(
"query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % (
str(device_id), tag_name, str(click_topic_tag_list)))
logging.info("click_topic_tag_list:%s"%(str(click_topic_tag_list)))
logging.info("click_topic_tag_list:%s" % (str(click_topic_tag_list)))
is_click = 1
is_vote = 0
......@@ -324,9 +369,8 @@ class CollectData(object):
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
if len(click_topic_tag_list)>0:
if len(click_topic_tag_list) > 0:
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=click_topic_tag_list,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
......@@ -392,9 +436,9 @@ class CollectData(object):
# # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
# self.update_recommend_tag_list(device_id, user_feature, user_id)
elif "type" in raw_val_dict and "interest_choice_click_next" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["tagid_list"],str):
if isinstance(raw_val_dict["params"]["tagid_list"], str):
tagid_list = json.loads(raw_val_dict["params"]["tagid_list"])
elif isinstance(raw_val_dict["params"]["tagid_list"],list):
elif isinstance(raw_val_dict["params"]["tagid_list"], list):
tagid_list = raw_val_dict["params"]["tagid_list"]
else:
tagid_list = list()
......@@ -408,7 +452,7 @@ class CollectData(object):
if len(tagid_list) > 0:
tag_query_results = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tagid_list, is_online=True, is_deleted=False,
is_category=False).values_list("id",flat =True))
is_category=False).values_list("id", flat=True))
is_click = 1
is_vote = 0
......@@ -491,7 +535,8 @@ class CollectData(object):
# 用户点击问题清单进linucb
elif b'content' in raw_val_dict:
data = json.loads(raw_val_dict[b'content'])
if 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS']['action'] == "venus/community/skin_check/submit_questions":
if 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS'][
'action'] == "venus/community/skin_check/submit_questions":
device_id = data['SYS']['cl_id']
tagid_list = list(data['APP'].get('answer_tag', []))
user_id = data['SYS'].get('user_id', None)
......@@ -511,7 +556,8 @@ class CollectData(object):
for i in range(5):
for tag_id in tag_query_results_multi:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
......@@ -532,9 +578,11 @@ class CollectData(object):
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
logging.info("skin_check topic type:%s, device_id:%s, tag_query_results:%s" %
(str(data['SYS']['action']), str(device_id), str(tag_query_results_multi)))
(str(data['SYS']['action']), str(device_id),
str(tag_query_results_multi)))
# 品牌问卷进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS']['action'] == "venus/community/survey_question/submit":
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS'][
'action'] == "venus/community/survey_question/submit":
device_id = data['SYS']['cl_id']
tagid_list = list(data['APP'].get('answer_tag', []))
user_id = data['SYS'].get('user_id', None)
......@@ -555,7 +603,8 @@ class CollectData(object):
for i in range(5):
for tag_id in tag_query_results_multi:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
......@@ -576,16 +625,87 @@ class CollectData(object):
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
logging.info("survey_question type:%s, device_id:%s, tagid_list:%s" %
(str(data['SYS']['action']), str(device_id), str(tag_query_results_multi)))
(str(data['SYS']['action']), str(device_id),
str(tag_query_results_multi)))
# 首页搜索精准匹配标签关键字进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
"api/v1/cards/topic" in str(data['SYS'].get('action',"")):
logging.info("action=api/v1/cards/topic")
tag_name = data["APP"].get("query", [])
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
name=tag_name).values_list("id"))
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
5)
logging.info(
"api/v1/cards/topic,device_id:%s,tag_list:%s" % (str(device_id), str(tag_list)))
# (客户端创建回答,后台创建回答或修改回答关联标签关键字进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
("venus/community/topic/create" in str(data['SYS'].get('action',"")) or
"venus/sun/topic/edit" in str(data['SYS'].get('action',""))
):
action=str(data['SYS'].get('action',''))
logging.info("action=%s"%(action))
tag_ids = list(data["APP"].get("tag_ids", []))
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tag_ids, is_online=True, is_deleted=False,
is_category=False).values_list("id"))
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
10)
logging.info("%s,device_id:%s,tag_list:%s" % (action,
str(device_id), str(tag_list)))
# 创建问题关注标签关键字进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
"venus/sun/pictorial/edit" in str(data['SYS'].get('action',"")):
action = str(data['SYS'].get('action', ''))
logging.info("action=%s" % (action))
tag_ids = list(data["APP"].get("tag_ids", []))
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tag_ids, is_online=True, is_deleted=False,
is_category=False).values_list("id"))
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
20)
logging.info("%s,device_id:%s,tag_list:%s" % (action,
str(device_id), str(tag_list)))
# kyc最后一题
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
"venus/community/survey_question/record_kyc_last_question" in str(data['SYS'].get('action',"")):
action = str(data['SYS'].get('action', ''))
tag_ids = list(data["APP"].get("tag_ids", []))
logging.info('action:%s,tag_list:%s' % (action, str(tag_ids)))
tag_query_results = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tag_ids, is_online=True, is_deleted=False,
is_category=False).values_list("id"))
tag_query_results = [i[0] for i in tag_query_results]
logging.info('action:%s,mysql query taglist:%s' % (action, str(tag_query_results)))
tag_query_results_multi = [i for i in tag_ids if i in tag_query_results]
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_query_results_multi,
5)
logging.info("action:%s,device_id:%s,tag_list:%s" % (action,
str(device_id),
str(tag_query_results_multi)))
else:
if msg:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
logging.warning(
"unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
# 假设数据库连接异常,强制退出程序,supervisor重启linub
os._exit(0)
return True
except:
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment