Commit 156448d4 authored by 段英荣's avatar 段英荣

Merge branch '2.3_relaces' into 'master'

客户端debug

See merge request alpha/physical!463
parents 6e9ee2c5 c042674c
...@@ -18,6 +18,18 @@ from trans2es.models.pictorial import CommunityPictorialHomeFeed ...@@ -18,6 +18,18 @@ from trans2es.models.pictorial import CommunityPictorialHomeFeed
from libs.error import logging_exception from libs.error import logging_exception
import os import os
from search.views.tag import get_same_tagset_ids from search.views.tag import get_same_tagset_ids
import msgpack
def loads_data(data):
try:
result = json.loads(data)
msg = True
return result,msg
except:
result = msgpack.loads(data)
msg = False
return result,msg
class KafkaManager(object): class KafkaManager(object):
...@@ -28,8 +40,9 @@ class KafkaManager(object): ...@@ -28,8 +40,9 @@ class KafkaManager(object):
if not cls.consumser_obj: if not cls.consumser_obj:
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=settings.KAFKA_BROKER_LIST) gm_logging_name = settings.KAFKA_GM_LOGGING_TOPIC_NAME
# cls.consumser_obj.subscribe([topic_name]) cls.consumser_obj = KafkaConsumer(bootstrap_servers=settings.KAFKA_BROKER_LIST)
cls.consumser_obj.subscribe([topic_name, gm_logging_name])
return cls.consumser_obj return cls.consumser_obj
...@@ -41,6 +54,7 @@ class CollectData(object): ...@@ -41,6 +54,7 @@ class CollectData(object):
self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:" self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
#推荐帖子 #推荐帖子
self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:" self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:"
# 推荐榜单
self.linucb_recommend_pictorial_id_prefix = "physical:linucb:pictorial_recommend:device_id:" self.linucb_recommend_pictorial_id_prefix = "physical:linucb:pictorial_recommend:device_id:"
self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:" self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:"
self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:" self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:"
...@@ -176,13 +190,11 @@ class CollectData(object): ...@@ -176,13 +190,11 @@ class CollectData(object):
consume_msg = msg_dict[msg_key] consume_msg = msg_dict[msg_key]
for ori_msg in consume_msg: for ori_msg in consume_msg:
try: try:
logging.info(ori_msg) raw_val_dict,msg = loads_data(ori_msg.value)
if msg:
raw_val_dict = json.loads(ori_msg.value) logging.info(ori_msg.value)
if "type" in raw_val_dict and \ if "type" in raw_val_dict and \
(raw_val_dict["type"] in ("on_click_feed_topic_card","on_click_button")): (raw_val_dict["type"] in ("on_click_feed_topic_card","on_click_button")):
click_topic_tag_list = list() click_topic_tag_list = list()
if "on_click_feed_topic_card" == raw_val_dict["type"]: if "on_click_feed_topic_card" == raw_val_dict["type"]:
topic_id = raw_val_dict["params"]["topic_id"] topic_id = raw_val_dict["params"]["topic_id"]
...@@ -318,6 +330,60 @@ class CollectData(object): ...@@ -318,6 +330,60 @@ class CollectData(object):
new_user_click_tag_list=tag_query_results) new_user_click_tag_list=tag_query_results)
else: else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
# 用户点击个性化push进linucb
elif "type" in raw_val_dict and raw_val_dict["type"] == "on_click_push":
if isinstance(raw_val_dict["params"]["tag_ids"], str):
tagid_list = json.loads(raw_val_dict["params"]["tag_ids"])
elif isinstance(raw_val_dict["params"]["tag_ids"], list):
tagid_list = raw_val_dict["params"]["tag_ids"]
else:
tagid_list = list()
device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
if len(tagid_list) > 0:
tag_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(
id__in=tagid_list, is_online=True, is_deleted=False,
is_category=False).values_list("id", flat=True)
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
for tag_id in tag_query_results:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results)
logging.info("on_click_push topic type:%s, device_id:%s, answer_tag:%s" %
(raw_val_dict.get("type", "missing type"), str(device_id),
str(tagid_list)))
# 用户点击问题清单进linucb
elif b'content' in raw_val_dict:
data = json.loads(raw_val_dict[b'content'])
if 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS']['action'] == "venus/community/skin_check/submit_questions":
device_id = data['SYS']['cl_id']
tagid_list = list(data['APP']['answer_tag']) or []
user_id = data['SYS']['user_id'] or None
logging.info("skin_check topic type:%s, device_id:%s, answer_tag:%s" %
(str(data['SYS']['action']), str(device_id), str(tagid_list)))
if len(tagid_list) > 0:
tag_query_results = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tagid_list, is_online=True, is_deleted=False,
is_category=False).values_list("id", flat=True))
tag_query_results_multi = [i for i in tagid_list if i in tag_query_results]
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
for i in range(5):
for tag_id in tag_query_results_multi:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results)
logging.info("skin_check topic type:%s, device_id:%s, tag_query_results:%s" %
(str(data['SYS']['action']), str(device_id), str(tag_query_results)))
else:
if msg:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except: except:
logging_exception() logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
...@@ -165,6 +165,8 @@ class LinUCB: ...@@ -165,6 +165,8 @@ class LinUCB:
} }
redis_cli.hset(redis_key, tag_id, pickle.dumps(user_tag_dict)) redis_cli.hset(redis_key, tag_id, pickle.dumps(user_tag_dict))
else:
logging.warning("not standard linucb reward")
return True return True
except: except:
logging_exception() logging_exception()
......
...@@ -124,7 +124,7 @@ class TopicUtils(object): ...@@ -124,7 +124,7 @@ class TopicUtils(object):
query_type=TopicPageType.HOME_RECOMMEND, query_type=TopicPageType.HOME_RECOMMEND,
filter_topic_id_list=[], test_score=False, must_topic_id_list=[], recommend_tag_list=[], filter_topic_id_list=[], test_score=False, must_topic_id_list=[], recommend_tag_list=[],
user_similar_score_list=[], index_type="topic", routing=None, attention_tag_list=[], user_similar_score_list=[], index_type="topic", routing=None, attention_tag_list=[],
linucb_user_id_list=[], disable_collpase=False): linucb_user_id_list=[], disable_collpase=False, has_score=False):
""" """
:remark:获取首页推荐帖子列表 :remark:获取首页推荐帖子列表
:param user_id: :param user_id:
...@@ -160,7 +160,7 @@ class TopicUtils(object): ...@@ -160,7 +160,7 @@ class TopicUtils(object):
ret_data_list = list() ret_data_list = list()
topic_id_list = list() topic_id_list = list()
topic_score_list = list()
q["query"] = dict() q["query"] = dict()
functions_list = list() functions_list = list()
...@@ -311,6 +311,8 @@ class TopicUtils(object): ...@@ -311,6 +311,8 @@ class TopicUtils(object):
for item in result_dict["hits"]: for item in result_dict["hits"]:
topic_id_list.append(item["_source"]["id"]) topic_id_list.append(item["_source"]["id"])
if has_score:
topic_score_list.append(item["_score"])
else: else:
multi_match = { multi_match = {
...@@ -431,11 +433,17 @@ class TopicUtils(object): ...@@ -431,11 +433,17 @@ class TopicUtils(object):
ret_data_list.append({"id": item["_source"]["id"], "highlight": item.get("highlight", {})}) ret_data_list.append({"id": item["_source"]["id"], "highlight": item.get("highlight", {})})
topic_id_list.append(item["_source"]["id"]) topic_id_list.append(item["_source"]["id"])
if has_score:
return topic_id_list,ret_data_list return topic_id_list,ret_data_list,topic_score_list
else:
return topic_id_list,ret_data_list
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list(),list() if has_score:
return list(), list(), list()
else:
return list(), list()
@classmethod @classmethod
def userful_tag_topic_list(cls, user_id, have_read_topic_list, size, def userful_tag_topic_list(cls, user_id, have_read_topic_list, size,
...@@ -520,6 +528,73 @@ class TopicUtils(object): ...@@ -520,6 +528,73 @@ class TopicUtils(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return [] return []
@classmethod
def get_linucb_topic_info_for_debug(cls, size,
index_type="topic-high-star", routing=None, linucb_topic_list=[]):
try:
es_cli_obj = ESPerform.get_cli()
if len(linucb_topic_list) == 0:
return {}
else:
q = dict()
q["query"] = {
"bool": {
"must": [
{"terms": {"id": linucb_topic_list}}
]
}
}
q["_source"] = {
"includes": ["id","content_level","edit_tag_list"]
}
result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name=index_type, query_body=q,
size=size,
routing=routing)
topic_id_dict = dict()
for item in result_dict["hits"]:
topic_id_dict.update({item["_source"]["id"]:{"content_level":item["_source"]["content_level"],"edit_tag_list":item["_source"]["edit_tag_list"]}})
return topic_id_dict
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
@classmethod
def get_linucb_pictorial_info_for_debug(cls,size,linucb_pictorial_list = []):
try:
q = {
"query": {
"function_score": {
"query": {
"bool": {
"must": [
{"terms": {"id": linucb_pictorial_list}}
]
}
},
"boost_mode": "sum",
"score_mode": "sum",
}
},
"_source": {
"include": ["id", "edit_tag_list"]
}
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="pictorial",
query_body=q,
offset=0, size=size)
pictorial_id_dict = dict()
for item in result_dict["hits"]:
pictorial_id_dict.update({item["_source"]["id"]: {"edit_tag_list": item["_source"]["edit_tag_list"]}})
return pictorial_id_dict
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list()
@classmethod @classmethod
def get_topic_detail_recommend_list(cls, user_id, topic_id, topic_tag_list, topic_pictorial_id, topic_user_id, def get_topic_detail_recommend_list(cls, user_id, topic_id, topic_tag_list, topic_pictorial_id, topic_user_id,
filter_topic_user_id, have_read_topic_list, offset, size, es_cli_obj=None, filter_topic_user_id, have_read_topic_list, offset, size, es_cli_obj=None,
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment