Commit 672f78f6 authored by 段英荣's avatar 段英荣

Merge branch 'gyz' into 'master'

Gyz

See merge request alpha/physical!491
parents 930ee89a 8dd1722d
......@@ -15,6 +15,7 @@ from libs.es import ESPerform
from search.utils.common import *
import libs.tools as Tools
from trans2es.models.pictorial import CommunityPictorialHomeFeed
from trans2es.models.portrait_stat import LikeDeviceTagStat
from libs.error import logging_exception
import os
from search.views.tag import get_same_tagset_ids
......@@ -51,12 +52,16 @@ class CollectData(object):
def __init__(self):
# lin tag参数
self.linucb_matrix_redis_prefix = "physical:linucb:device_id:"
self.ctr_linucb_matrix_redis_prefix = "ctr_physical:linucb:device_id:"
# lin推荐tag
self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
self.ctr_linucb_recommend_redis_prefix = "ctr_physical:linucb:tag_recommend:device_id:"
# 推荐帖子
self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:"
self.ctr_linucb_recommend_topic_id_prefix = "ctr_physical:linucb:topic_recommend:device_id:"
# 推荐榜单
self.linucb_recommend_pictorial_id_prefix = "physical:linucb:pictorial_recommend:device_id:"
self.ctr_linucb_recommend_pictorial_id_prefix = "ctr_physical:linucb:pictorial_recommend:device_id:"
self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:"
self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:"
......@@ -64,9 +69,9 @@ class CollectData(object):
self.user_feature = [0, 1]
def _get_user_linucb_info(self, device_id):
def _get_user_linucb_info(self, device_id, linucb_matrix_prefix):
try:
redis_key = self.linucb_matrix_redis_prefix + str(device_id)
redis_key = linucb_matrix_prefix + str(device_id)
# dict的key为标签ID,value为4个矩阵
redis_linucb_tag_data_dict = redis_client.hgetall(redis_key)
......@@ -77,21 +82,26 @@ class CollectData(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return dict()
def update_recommend_tag_list(self, device_id,user_feature=None,user_id=None,click_topic_tag_list=None,new_user_click_tag_list = []):
def update_recommend_tag_list(self, device_id, user_feature=None, user_id=None, click_topic_tag_list=None,
new_user_click_tag_list=[], linucb_matrix_prefix=None, linucb_recommend_tag_prefix=None,
linucb_topic_ids_prefix=None, linucb_pictorial_ids_prefix=None):
try:
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id)
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id, linucb_matrix_prefix)
if len(redis_linucb_tag_data_dict) == 0:
recommend_tag_list = list(LinUCB.get_default_tag_list(user_id))
LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
LinUCB.init_device_id_linucb_info(redis_client, linucb_matrix_prefix, device_id, recommend_tag_list)
else:
user_feature = user_feature if user_feature else self.user_feature
(recommend_tag_dict,recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))
linucb_tag_list = list(redis_linucb_tag_data_dict.keys())
(recommend_tag_dict, recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,
redis_linucb_tag_data_dict,
user_feature, linucb_tag_list)
recommend_tag_list = list(recommend_tag_dict.keys())
if len(recommend_tag_list) > 0:
tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
tag_recommend_redis_key = linucb_recommend_tag_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
# redis_client.expire(tag_recommend_redis_key, 7*24*60*60)
redis_client.expire(tag_recommend_redis_key, 30*24*60*60)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND)
have_read_lin_pictorial_id_list = Tools.get_have_read_lin_pictorial_id_list(device_id, user_id,
......@@ -125,8 +135,8 @@ class CollectData(object):
# redis_client.hmset(click_recommend_redis_key, click_redis_data_dict)
tag_id_list = recommend_tag_list[0:20]
pictorial_recommend_redis_key = self.linucb_recommend_pictorial_id_prefix + str(device_id)
topic_recommend_redis_key = self.linucb_recommend_topic_id_prefix + str(device_id)
pictorial_recommend_redis_key = linucb_pictorial_ids_prefix + str(device_id)
topic_recommend_redis_key = linucb_topic_ids_prefix + str(device_id)
# redis_topic_data_dict = redis_client.hgetall(topic_recommend_redis_key)
# redis_topic_list = list()
# cursor = -1
......@@ -171,19 +181,74 @@ class CollectData(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None):
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature, linucb_matrix_redis_prefix):
try:
user_feature = user_feature if user_feature else self.user_feature
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client)
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id, linucb_matrix_redis_prefix, redis_client)
except:
logging_exception()
logging.error("update_user_linucb_tag_info error!")
return False
def transfer_old_info2ctr_feature_key(self, device_id):
try:
# 移植老用户的lin标签参数信息到ctr特征策略
ctr_linucb_matrix_redis_prefix_key = self.ctr_linucb_matrix_redis_prefix + str(device_id)
linucb_matrix_redis_prefix_key = self.linucb_matrix_redis_prefix + str(device_id)
if redis_client.exists(ctr_linucb_matrix_redis_prefix_key): #如果新策略存在lin信息,则不需要移植
return True
else:
if redis_client.exists(linucb_matrix_redis_prefix_key):
older_device_info = redis_client.hgetall(linucb_matrix_redis_prefix_key)
redis_client.hmset(ctr_linucb_matrix_redis_prefix_key, older_device_info)
# 移植老用户的lin推荐标签列表信息到ctr特征策略
ctr_linucb_recommend_redis_prefix = self.ctr_linucb_recommend_redis_prefix + str(device_id)
linucb_recommend_redis_prefix = self.linucb_recommend_redis_prefix + str(device_id)
if not redis_client.exists(ctr_linucb_recommend_redis_prefix):
if redis_client.exists(linucb_recommend_redis_prefix):
older_device_info = redis_client.get(linucb_recommend_redis_prefix)
redis_client.set(ctr_linucb_recommend_redis_prefix, older_device_info)
# 移植老用户的lin帖子推荐队列信息到ctr特征策略
linucb_recommend_topic_id_prefix = self.linucb_recommend_topic_id_prefix + str(device_id)
ctr_linucb_recommend_topic_id_prefix = self.ctr_linucb_recommend_topic_id_prefix + str(device_id)
if not redis_client.exists(ctr_linucb_recommend_topic_id_prefix):
if redis_client.exists(linucb_recommend_topic_id_prefix):
older_device_info = redis_client.hgetall(linucb_recommend_topic_id_prefix)
redis_client.hmset(ctr_linucb_recommend_topic_id_prefix, older_device_info)
# 移植老用户的lin榜单推荐队列信息到ctr特征策略
linucb_recommend_pictorial_id_prefix = self.linucb_recommend_pictorial_id_prefix + str(device_id)
ctr_linucb_recommend_pictorial_id_prefix = self.ctr_linucb_recommend_pictorial_id_prefix + str(device_id)
if not redis_client.exists(ctr_linucb_recommend_pictorial_id_prefix):
if redis_client.exists(linucb_recommend_pictorial_id_prefix):
older_device_info = redis_client.hgetall(linucb_recommend_pictorial_id_prefix)
redis_client.hmset(ctr_linucb_recommend_pictorial_id_prefix, older_device_info)
logging.info("transfer_old_info2ctr_feature_key sucess:"+str(device_id))
return True
except:
logging_exception()
logging.error("transfer_old_info2ctr_feature_key error!")
return False
def get_device_tag_ctr(self, device_id, tag_id):
# 获取用户在该tag下的ctr信息
try:
device_tag_ctr = LikeDeviceTagStat.objects.using(settings.SLAVE1_DB_NAME).filter(
device_id=device_id, tag_id=tag_id).values("tag_ctr_30")
if device_tag_ctr:
device_tag_ctr_value = device_tag_ctr[0].get("tag_ctr_30", 0.0)
else:
device_tag_ctr_value = 0.0
logging.info("get_device_tag_ctr" + str(device_id) + str(tag_id))
return device_tag_ctr_value
except:
logging_exception()
logging.error("get_device_tag_ctr error!")
return 0.0
def consume_data_from_kafka(self,topic_name=None):
try:
user_feature = [1,1]
user_feature = [1, 1]
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
while True:
......@@ -245,12 +310,34 @@ class CollectData(object):
reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
# 更新不同策略的lin标签参数信息
for tag_id in click_topic_tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
if len(click_topic_tag_list)>0:
self.update_recommend_tag_list(device_id, user_feature, user_id,click_topic_tag_list=click_topic_tag_list)
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=click_topic_tag_list,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=click_topic_tag_list,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
# elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
# if isinstance(raw_val_dict["params"]["exposure_cards"],str):
# exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
......@@ -324,12 +411,32 @@ class CollectData(object):
is_vote = 0
reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
for tag_id in tag_query_results:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results)
new_user_click_tag_list=tag_query_results,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
# 用户点击个性化push进linucb
......@@ -357,9 +464,23 @@ class CollectData(object):
is_vote = 0
reward = 1 if is_click or is_vote else 0
for tag_id in tag_query_results:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results)
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
logging.info("on_click_push topic type:%s, device_id:%s, tag_ids:%s" %
(raw_val_dict.get("type", "missing type"), str(device_id),
str(tagid_list)))
......@@ -383,11 +504,26 @@ class CollectData(object):
reward = 1 if is_click or is_vote else 0
for i in range(5):
for tag_id in tag_query_results_multi:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id,
new_user_click_tag_list=tag_query_results)
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
logging.info("skin_check topic type:%s, device_id:%s, tag_query_results:%s" %
(str(data['SYS']['action']), str(device_id), str(tag_query_results)))
else:
......@@ -398,7 +534,6 @@ class CollectData(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
# 假设数据库连接异常,强制退出程序,supervisor重启linub
os._exit(0)
return True
except:
logging_exception()
......
......@@ -125,7 +125,10 @@ def get_home_recommend_topic_ids(user_id, device_id, tag_id, offset, size, query
have_read_topic_id_list.extend(useful_topic_id_list)
# linucb 推荐帖子
topic_recommend_redis_key = "physical:linucb:topic_recommend:device_id:" + str(device_id)
if STRATEGY_TYPE.CTR_GRAY in gray_list:
topic_recommend_redis_key = "ctr_physical:linucb:topic_recommend:device_id:" + str(device_id)
else:
topic_recommend_redis_key = "physical:linucb:topic_recommend:device_id:" + str(device_id)
recommend_topic_dict = redis_client.hgetall(topic_recommend_redis_key)
linucb_recommend_topic_id_list = list()
......@@ -257,7 +260,10 @@ def get_home_recommend_topic_ids(user_id, device_id, tag_id, offset, size, query
def get_home_recommend_pictorial_ids(user_id=-1, device_id="", size=4, gray_list=[]):
try:
pictorial_recommend_redis_key = "physical:linucb:pictorial_recommend:device_id:" + str(device_id)
if STRATEGY_TYPE.CTR_GRAY in gray_list:
pictorial_recommend_redis_key = "ctr_physical:linucb:pictorial_recommend:device_id:"+str(device_id)
else:
pictorial_recommend_redis_key = "physical:linucb:pictorial_recommend:device_id:" + str(device_id)
have_read_lin_pictorial_id_list = get_have_read_lin_pictorial_id_list(device_id, user_id,
TopicPageType.HOME_RECOMMEND)
pictorial_recommend_dict = redis_client.hgetall(pictorial_recommend_redis_key)
......
import json
import logging
import datetime
from libs.cache import redis_client
from libs.error import logging_exception
from django.conf import settings
from trans2es.models.portrait_stat import LikeDeviceTagStat
try:
ps = redis_client.pubsub()
ps.subscribe("new_topic_impression")
all_new_topic_impression_count_key = "all_new_topic_impression_count_key"
for item in ps.listen():
if item['type'] == 'message':
new_topic_ids = json.loads(item["data"])
all_new_topic_impression_count = json.loads(redis_client.get(all_new_topic_impression_count_key))
insert_topic_ids = []
for topic in new_topic_ids:
topic = str(topic)
if topic in all_new_topic_impression_count:
all_new_topic_impression_count[topic] = all_new_topic_impression_count[topic] + 1
if all_new_topic_impression_count[topic] > 100:
insert_topic_ids.append(int(topic))
all_new_topic_impression_count.pop(topic)
else:
all_new_topic_impression_count[topic] = 1
if insert_topic_ids:
insert_list = []
for topic in insert_topic_ids:
insert_list.append(
LikeDeviceTagStat(create_time=datetime.datetime.today(), update_time=datetime.datetime.today(),
topic_id=topic, is_new_topic=0, topic_ctr_30=0.0, like_rate_30=0.0))
LikeDeviceTagStat.objects.using(settings.MASTER_DB_NAME).bulk_create(insert_list)
logging.info("impressions count gt 100 topic ids" + str(insert_topic_ids))
json_all_new_topic_impression_count = json.dumps(all_new_topic_impression_count)
logging.info("all_new_topic_impression_count" + str(all_new_topic_impression_count))
redis_client.set(all_new_topic_impression_count_key, json_all_new_topic_impression_count)
except:
logging_exception()
logging.error("redis new topic sub error!")
from django.db import models
class LikeDeviceTagStat(models.Model):
class Meta:
verbose_name = u"30天内设备的tag的stat"
db_table = "like_device_tag_stat"
unique_together = ("device_id", "tag_id")
id = models.IntegerField(primary_key=True, verbose_name=u"主键ID")
create_time = models.DateTimeField(verbose_name=u'统计创建时间')
update_time = models.DateTimeField(verbose_name=u'统计更新时间')
device_id = models.CharField(verbose_name=u'设备id', max_length=100)
tag_id = models.IntegerField(verbose_name=u'标签id')
tag_click_30 = models.IntegerField(verbose_name=u'30天内的点击数')
tag_impress_30 = models.IntegerField(verbose_name=u"30天内的曝光数")
tag_ctr_30 = models.FloatField(verbose_name=u"30天内的ctr")
class LikeTopicStat(models.Model):
class Meta:
verbose_name = u"30天内回答的stat"
db_table = "like_topic_stat "
id = models.IntegerField(primary_key=True, verbose_name=u"主键ID")
create_time = models.DateTimeField(verbose_name=u'统计创建时间')
update_time = models.DateTimeField(verbose_name=u'统计更新时间')
topic_id = models.IntegerField(verbose_name=u'回答id', unique=True)
is_new_topic = models.IntegerField(verbose_name=u"是否是新帖")
topic_ctr_30 = models.FloatField(verbose_name=u"30天内回答的ctr")
like_rate_30 = models.FloatField(verbose_name=u"30天内回答的点赞率")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment