Commit 8f4369d0 authored by 高雅喆's avatar 高雅喆

linucb增加ctr特征,生成两个用户lin推荐队列

parent 85ab77fa
...@@ -51,12 +51,16 @@ class CollectData(object): ...@@ -51,12 +51,16 @@ class CollectData(object):
def __init__(self): def __init__(self):
# lin tag参数 # lin tag参数
self.linucb_matrix_redis_prefix = "physical:linucb:device_id:" self.linucb_matrix_redis_prefix = "physical:linucb:device_id:"
self.ctr_linucb_matrix_redis_prefix = "ctr_physical:linucb:device_id:"
# lin推荐tag # lin推荐tag
self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:" self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
self.ctr_linucb_recommend_redis_prefix = "ctr_physical:linucb:tag_recommend:device_id:"
# 推荐帖子 # 推荐帖子
self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:" self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:"
self.ctr_linucb_recommend_topic_id_prefix = "ctr_physical:linucb:topic_recommend:device_id:"
# 推荐榜单 # 推荐榜单
self.linucb_recommend_pictorial_id_prefix = "physical:linucb:pictorial_recommend:device_id:" self.linucb_recommend_pictorial_id_prefix = "physical:linucb:pictorial_recommend:device_id:"
self.ctr_linucb_recommend_pictorial_id_prefix = "ctr_physical:linucb:pictorial_recommend:device_id:"
self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:" self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:"
self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:" self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:"
...@@ -64,9 +68,9 @@ class CollectData(object): ...@@ -64,9 +68,9 @@ class CollectData(object):
self.user_feature = [0, 1] self.user_feature = [0, 1]
def _get_user_linucb_info(self, device_id): def _get_user_linucb_info(self, device_id, linucb_matrix_prefix):
try: try:
redis_key = self.linucb_matrix_redis_prefix + str(device_id) redis_key = linucb_matrix_prefix + str(device_id)
# dict的key为标签ID,value为4个矩阵 # dict的key为标签ID,value为4个矩阵
redis_linucb_tag_data_dict = redis_client.hgetall(redis_key) redis_linucb_tag_data_dict = redis_client.hgetall(redis_key)
...@@ -77,21 +81,26 @@ class CollectData(object): ...@@ -77,21 +81,26 @@ class CollectData(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return dict() return dict()
def update_recommend_tag_list(self, device_id,user_feature=None,user_id=None,click_topic_tag_list=None,new_user_click_tag_list = []): def update_recommend_tag_list(self, device_id, user_feature=None, user_id=None, click_topic_tag_list=None,
new_user_click_tag_list=[], linucb_matrix_prefix=None, linucb_recommend_tag_prefix=None,
linucb_topic_ids_prefix=None, linucb_pictorial_ids_prefix=None):
try: try:
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id) redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id, linucb_matrix_prefix)
if len(redis_linucb_tag_data_dict) == 0: if len(redis_linucb_tag_data_dict) == 0:
recommend_tag_list = list(LinUCB.get_default_tag_list(user_id)) recommend_tag_list = list(LinUCB.get_default_tag_list(user_id))
LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list) LinUCB.init_device_id_linucb_info(redis_client, linucb_matrix_prefix, device_id, recommend_tag_list)
else: else:
user_feature = user_feature if user_feature else self.user_feature user_feature = user_feature if user_feature else self.user_feature
(recommend_tag_dict,recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys())) linucb_tag_list = list(redis_linucb_tag_data_dict.keys())
(recommend_tag_dict, recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,
redis_linucb_tag_data_dict,
user_feature, linucb_tag_list)
recommend_tag_list = list(recommend_tag_dict.keys()) recommend_tag_list = list(recommend_tag_dict.keys())
if len(recommend_tag_list) > 0: if len(recommend_tag_list) > 0:
tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id) tag_recommend_redis_key = linucb_recommend_tag_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list)) redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
# redis_client.expire(tag_recommend_redis_key, 7*24*60*60) redis_client.expire(tag_recommend_redis_key, 30*24*60*60)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND) have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND)
have_read_lin_pictorial_id_list = Tools.get_have_read_lin_pictorial_id_list(device_id, user_id, have_read_lin_pictorial_id_list = Tools.get_have_read_lin_pictorial_id_list(device_id, user_id,
...@@ -125,8 +134,8 @@ class CollectData(object): ...@@ -125,8 +134,8 @@ class CollectData(object):
# redis_client.hmset(click_recommend_redis_key, click_redis_data_dict) # redis_client.hmset(click_recommend_redis_key, click_redis_data_dict)
tag_id_list = recommend_tag_list[0:20] tag_id_list = recommend_tag_list[0:20]
pictorial_recommend_redis_key = self.linucb_recommend_pictorial_id_prefix + str(device_id) pictorial_recommend_redis_key = linucb_pictorial_ids_prefix + str(device_id)
topic_recommend_redis_key = self.linucb_recommend_topic_id_prefix + str(device_id) topic_recommend_redis_key = linucb_topic_ids_prefix + str(device_id)
# redis_topic_data_dict = redis_client.hgetall(topic_recommend_redis_key) # redis_topic_data_dict = redis_client.hgetall(topic_recommend_redis_key)
# redis_topic_list = list() # redis_topic_list = list()
# cursor = -1 # cursor = -1
...@@ -171,19 +180,59 @@ class CollectData(object): ...@@ -171,19 +180,59 @@ class CollectData(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False return False
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None): def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature, linucb_matrix_redis_prefix):
try: try:
user_feature = user_feature if user_feature else self.user_feature user_feature = user_feature if user_feature else self.user_feature
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client) return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id, linucb_matrix_redis_prefix, redis_client)
except: except:
logging_exception() logging_exception()
logging.error("update_user_linucb_tag_info error!") logging.error("update_user_linucb_tag_info error!")
return False return False
def transfer_old_info2ctr_feature_key(self, device_id):
try:
# 移植老用户的lin标签参数信息到ctr特征策略
ctr_linucb_matrix_redis_prefix_key = self.ctr_linucb_matrix_redis_prefix + str(device_id)
linucb_matrix_redis_prefix_key = self.linucb_matrix_redis_prefix + str(device_id)
if redis_client.exists(ctr_linucb_matrix_redis_prefix_key): #如果新策略存在lin信息,则不需要移植
return True
else:
if redis_client.exists(linucb_matrix_redis_prefix_key):
older_device_info = redis_client.hgetall(linucb_matrix_redis_prefix_key)
redis_client.hmset(ctr_linucb_matrix_redis_prefix_key, older_device_info)
# 移植老用户的lin推荐标签列表信息到ctr特征策略
ctr_linucb_recommend_redis_prefix = self.ctr_linucb_recommend_redis_prefix + str(device_id)
linucb_recommend_redis_prefix = self.linucb_recommend_redis_prefix + str(device_id)
if not redis_client.exists(ctr_linucb_recommend_redis_prefix):
if redis_client.exists(linucb_recommend_redis_prefix):
older_device_info = redis_client.get(linucb_recommend_redis_prefix)
redis_client.set(ctr_linucb_recommend_redis_prefix, older_device_info)
# 移植老用户的lin帖子推荐队列信息到ctr特征策略
linucb_recommend_topic_id_prefix = self.linucb_recommend_topic_id_prefix + str(device_id)
ctr_linucb_recommend_topic_id_prefix = self.ctr_linucb_recommend_topic_id_prefix + str(device_id)
if not redis_client.exists(ctr_linucb_recommend_topic_id_prefix):
if redis_client.exists(linucb_recommend_topic_id_prefix):
older_device_info = redis_client.hgetall(linucb_recommend_topic_id_prefix)
redis_client.hmset(ctr_linucb_recommend_topic_id_prefix, older_device_info)
# 移植老用户的lin榜单推荐队列信息到ctr特征策略
linucb_recommend_pictorial_id_prefix = self.linucb_recommend_pictorial_id_prefix + str(device_id)
ctr_linucb_recommend_pictorial_id_prefix = self.ctr_linucb_recommend_pictorial_id_prefix + str(device_id)
if not redis_client.exists(ctr_linucb_recommend_pictorial_id_prefix):
if redis_client.exists(linucb_recommend_pictorial_id_prefix):
older_device_info = redis_client.hgetall(linucb_recommend_pictorial_id_prefix)
redis_client.hmset(ctr_linucb_recommend_pictorial_id_prefix, older_device_info)
logging.info("transfer_old_info2ctr_feature_key sucess:"+str(device_id))
return True
except:
logging_exception()
logging.error("transfer_old_info2ctr_feature_key error!")
return False
def consume_data_from_kafka(self,topic_name=None): def consume_data_from_kafka(self,topic_name=None):
try: try:
user_feature = [1,1] user_feature = [1,1]
user_feature_ctr = [0.5, 0.5]
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name) kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
while True: while True:
...@@ -245,12 +294,31 @@ class CollectData(object): ...@@ -245,12 +294,31 @@ class CollectData(object):
reward = 1 if is_click or is_vote else 0 reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
# 更新不同策略的lin标签参数信息
for tag_id in click_topic_tag_list: for tag_id in click_topic_tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
if len(click_topic_tag_list)>0: if len(click_topic_tag_list)>0:
self.update_recommend_tag_list(device_id, user_feature, user_id,click_topic_tag_list=click_topic_tag_list) self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=click_topic_tag_list,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature_ctr, user_id,
click_topic_tag_list=click_topic_tag_list,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
# elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]: # elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
# if isinstance(raw_val_dict["params"]["exposure_cards"],str): # if isinstance(raw_val_dict["params"]["exposure_cards"],str):
# exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"]) # exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
...@@ -324,12 +392,29 @@ class CollectData(object): ...@@ -324,12 +392,29 @@ class CollectData(object):
is_vote = 0 is_vote = 0
reward = 1 if is_click or is_vote else 0 reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
for tag_id in tag_query_results: for tag_id in tag_query_results:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id, self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results) new_user_click_tag_list=tag_query_results,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature_ctr, user_id,
new_user_click_tag_list=tag_query_results,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
else: else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
# 用户点击个性化push进linucb # 用户点击个性化push进linucb
...@@ -357,9 +442,20 @@ class CollectData(object): ...@@ -357,9 +442,20 @@ class CollectData(object):
is_vote = 0 is_vote = 0
reward = 1 if is_click or is_vote else 0 reward = 1 if is_click or is_vote else 0
for tag_id in tag_query_results: for tag_id in tag_query_results:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id, self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results) linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature_ctr, user_id,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
logging.info("on_click_push topic type:%s, device_id:%s, tag_ids:%s" % logging.info("on_click_push topic type:%s, device_id:%s, tag_ids:%s" %
(raw_val_dict.get("type", "missing type"), str(device_id), (raw_val_dict.get("type", "missing type"), str(device_id),
str(tagid_list))) str(tagid_list)))
...@@ -383,11 +479,23 @@ class CollectData(object): ...@@ -383,11 +479,23 @@ class CollectData(object):
reward = 1 if is_click or is_vote else 0 reward = 1 if is_click or is_vote else 0
for i in range(5): for i in range(5):
for tag_id in tag_query_results_multi: for tag_id in tag_query_results_multi:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature) self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id, self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results) linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature_ctr, user_id,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
logging.info("skin_check topic type:%s, device_id:%s, tag_query_results:%s" % logging.info("skin_check topic type:%s, device_id:%s, tag_query_results:%s" %
(str(data['SYS']['action']), str(device_id), str(tag_query_results))) (str(data['SYS']['action']), str(device_id), str(tag_query_results)))
else: else:
...@@ -398,7 +506,6 @@ class CollectData(object): ...@@ -398,7 +506,6 @@ class CollectData(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
# 假设数据库连接异常,强制退出程序,supervisor重启linub # 假设数据库连接异常,强制退出程序,supervisor重启linub
os._exit(0) os._exit(0)
return True return True
except: except:
logging_exception() logging_exception()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment