Commit 71973498 authored by 段英荣's avatar 段英荣

Merge branch 'linUCB' into 'master'

Lin ucb

See merge request !124
parents 14f87e0e 4912fca2
from django.contrib import admin
# Register your models here.
from django.db import models
# Create your models here.
from django.test import TestCase
# Create your tests here.
from django.shortcuts import render
# Create your views here.
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
from kafka import KafkaConsumer
from libs.cache import redis_client
import logging
from linucb.views.linucb import LinUCB
import json
from trans2es.models.tag import TopicTag
class KafkaManager(object):
# kafka信息
kafka_broker_list = "192.168.13.114:9092,192.168.13.116:9092,192.168.13.115:9092"
topic_name = "alpha-maidian-data"
consumser_obj = None
@classmethod
def get_kafka_consumer_ins(cls, topic_name=None):
if not cls.consumser_obj:
topic_name = cls.topic_name if not topic_name else topic_name
cls.consumser_obj = KafkaConsumer(bootstrap_servers=cls.kafka_broker_list)
cls.consumser_obj.subscribe([topic_name])
return cls.consumser_obj
class CollectData(object):
def __init__(self):
self.linucb_matrix_redis_prefix = "physical:linucb:device_id:"
self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
# 默认
self.user_feature = [0,1]
def _get_user_linucb_info(self, device_id):
try:
redis_key = self.linucb_matrix_redis_prefix + str(device_id)
# dict的key为标签ID,value为4个矩阵
redis_linucb_tag_data_dict = redis_client.hgetall(redis_key)
return redis_linucb_tag_data_dict
except:
logging.error("get_user_linucb_info error!")
return dict()
def update_recommend_tag_list(self, device_id,user_feature=None):
try:
recommend_tag_list = list()
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id)
if len(redis_linucb_tag_data_dict) == 0:
recommend_tag_list = LinUCB.get_default_tag_list()
LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
else:
user_feature = user_feature if user_feature else self.user_feature
recommend_tag_list = LinUCB.linucb_recommend_tag(redis_linucb_tag_data_dict,user_feature,redis_linucb_tag_data_dict.keys())
if len(recommend_tag_list) > 0:
tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, recommend_tag_list)
# Todo:设置过期时间,调研set是否支持
redis_client.expire(tag_recommend_redis_key, 7*24*60*60)
return True
except:
logging.error("get_recommend_tag_list error!")
return False
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None):
try:
user_feature = user_feature if user_feature else self.user_feature
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client)
except:
logging.error("update_user_linucb_tag_info error!")
return False
def consume_data_from_kafka(self,topic_name=None):
try:
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
for ori_msg in kafka_consumer_obj:
logging.info(ori_msg)
value_dict = json.loads(ori_msg.value)
if "@raw" in value_dict and "type" in value_dict["@raw"] and "on_click_feed_topic_card"==value_dict["@raw"]["type"]:
topic_id = value_dict["@raw"]["params"]["business_id"]
device_id = value_dict["@raw"]["device"]["device_id"]
logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id),str()))
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True,topic_id=topic_id)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
is_click = 1
is_vote = 0
user_feature = [is_click, is_vote]
reward = 1 if is_click or is_vote else 0
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward,device_id,tag_id,user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
return True
except:
logging.error("consume_data_from_kafka error!")
return False
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
import numpy as np
import redis
from libs.cache import redis_client
from trans2es.models.tag import Tag
import logging
class LinUCB:
d = 2
alpha = 0.25
r1 = 1
r0 = 0
default_tag_list = list()
@classmethod
def get_default_tag_list(cls):
try:
if len(cls.default_tag_list) == 0:
query_item_results = Tag.objects.query(is_online=True)
for item in query_item_results:
cls.default_tag_list.append(item.id)
return cls.default_tag_list[:20]
except:
logging.error("get_tag_list error!")
return list()
@classmethod
def linucb_recommend_tag(cls,redis_linucb_tag_data_dict,user_features_list,tag_list):
"""
:remark 获取推荐标签
:param redis_linucb_tag_data_dict:
:param user_features_list:
:param tag_list:
:return:
"""
try:
Aa_list = list()
for tag_id in redis_linucb_tag_data_dict:
Aa_list.append(redis_linucb_tag_data_dict[tag_id]["Aa"])
theta_list = list()
for tag_id in redis_linucb_tag_data_dict:
theta_list.append(redis_linucb_tag_data_dict[tag_id]["theta"])
xaT = np.array([user_features_list])
xa = np.transpose(xaT)
art_max = -1
old_pa = 0
AaI_tmp = np.array(Aa_list)
theta_tmp = np.array(theta_list)
art_max = tag_list[np.argmax(np.dot(xaT, theta_tmp) + cls.alpha * np.sqrt(np.dot(np.dot(xaT, AaI_tmp), xa)))]
return [art_max]
except:
logging.error("linucb_recommend_tag error!")
return []
@classmethod
def init_device_id_linucb_info(cls, redis_cli,redis_prefix, device_id, tag_list):
try:
redis_key = redis_prefix + str(device_id)
user_tag_linucb_dict = dict()
for tag_id in tag_list:
user_tag_linucb_dict[tag_id] = {
"Aa": np.identity(cls.d),
"theta": np.zeros((cls.d, 1)),
"ba": np.zeros((cls.d, 1)),
"AaI": np.identity(cls.d)
}
redis_cli.hmset(redis_key, user_tag_linucb_dict)
return True
except:
logging.error("init_device_id_linucb_info error!")
return False
@classmethod
def update_linucb_info(cls, user_features,reward, tag_id, device_id, redis_prefix,redis_cli):
try:
if reward == -1:
logging.warning("reward val error!")
elif reward == 1 or reward == 0:
if reward == 1:
r = cls.r1
else:
r = cls.r0
xaT = np.array([user_features])
xa = np.transpose(xaT)
redis_key = redis_prefix + str(device_id)
ori_redis_tag_dict = redis_cli.hget(redis_key, tag_id)
new_Aa_matrix = ori_redis_tag_dict["Aa"] + np.dot(xa, xaT)
new_AaI_matrix = np.linalg.solve(new_Aa_matrix, np.identity(cls.d))
new_ba_matrix = ori_redis_tag_dict["ba"] + r*xa
user_tag_dict = {
"Aa": new_Aa_matrix,
"ba": new_ba_matrix,
"AaI": new_AaI_matrix,
"theta": np.dot(new_AaI_matrix, new_ba_matrix)
}
redis_cli.hset(redis_key, tag_id, user_tag_dict)
return True
except:
logging.error("update_linucb_info error!")
return False
\ No newline at end of file
......@@ -93,7 +93,32 @@ class TopicUtils(object):
return []
@classmethod
def get_recommend_topic_ids(cls,user_id,offset,size,query=None,query_type=TopicPageType.FIND_PAGE,filter_topic_id_list=[],test_score=False,must_topic_id_list=[]):
def get_topic_tag_info(cls, offset, size, topic_id_list):
try:
q = {
"query":{
"terms":{
"id": topic_id_list
}
},
"_source":{
"includes": ["id", "group_id", "offline_score", "user_id", "edit_tag_list"]
}
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q,
offset=offset, size=size)
topic_id_dict = dict()
for item in result_dict["hits"]:
topic_id_dict[item["_source"]["id"]] = item["_source"]["edit_tag_list"]
return topic_id_dict
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {}
@classmethod
def get_recommend_topic_ids(cls,user_id,offset,size,query=None,query_type=TopicPageType.FIND_PAGE,filter_topic_id_list=[],test_score=False,must_topic_id_list=[],recommend_tag_list=[]):
"""
:需增加打散逻辑
:remark:获取首页推荐帖子列表
......@@ -195,6 +220,15 @@ class TopicUtils(object):
"weight": 1
}
)
if len(recommend_tag_list)>0:
functions_list.append(
{
"filter":{"bool":{
"should":{"terms":{"edit_tag_list":recommend_tag_list}}}},
"weight": 3
}
)
low_content_level = 4 if query_type==TopicPageType.FIND_PAGE else 3
query_function_score = {
......@@ -249,7 +283,7 @@ class TopicUtils(object):
"field": "user_id"
}
q["_source"] = {
"includes":["id","group_id","offline_score","user_id"]
"includes":["id","group_id","offline_score","user_id","edit_tag_list"]
}
q["sort"] = [
{
......
......@@ -58,13 +58,16 @@ def get_home_recommend_topic_ids(user_id,device_id,offset,size,query=None,query_
redis_field_list = [b'have_read_topic_list']
redis_field_val_list = redis_client.hmget(redis_key,redis_field_list)
tag_recommend_redis_key = "physical:linucb:tag_recommend:device_id:" + str(device_id)
recommend_tag_list = redis_client.get(tag_recommend_redis_key) or []
recommend_topic_ids = []
have_read_topic_id_list = list()
if redis_field_val_list[0]:
have_read_topic_id_list = list(json.loads(redis_field_val_list[0]))
topic_id_dict = TopicUtils.get_recommend_topic_ids(user_id, offset, size*size,query,query_type,filter_topic_id_list=have_read_topic_id_list)
topic_id_dict = TopicUtils.get_recommend_topic_ids(user_id, offset, size*size,query,query_type,filter_topic_id_list=have_read_topic_id_list,recommend_tag_list=recommend_tag_list)
have_read_group_id_set = set()
have_read_user_id_set = set()
unread_topic_id_dict = dict()
......@@ -205,6 +208,14 @@ def get_home_recommend_topic_ids(user_id,device_id,offset,size,query=None,query_
"""
@bind("physical/search/query_tag_id_by_topic")
def query_tag_id_by_topic(offset=0,size=10,topic_id_list=[]):
try:
return TopicUtils.get_topic_tag_info(offset,size,topic_id_list)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {}
@bind("physical/search/home_recommend")
def home_recommend(device_id="",user_id=-1,offset=0,size=10,query_type=TopicPageType.HOME_RECOMMEND):
"""
......
......@@ -26,7 +26,7 @@ from trans2es.models.user_extra import UserExtra
from trans2es.models.group import Group
from trans2es.models.topic import Topic,ActionSumAboutTopic
from search.utils.common import *
from linucb.views.collect_data import CollectData
class Job(object):
__es = None
......@@ -203,5 +203,9 @@ class Command(BaseCommand):
SyncDataToRedis.sync_face_similar_data_to_redis()
if len(options["test_score"]):
self.generate_topic_score_detail()
if len(options["sync_type"]) and options["sync_type"]=="linucb":
collect_obj = CollectData()
collect_obj.consume_data_from_kafka()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -19,6 +19,9 @@
},
"name":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"tag_type":{"type":"long"},
"collection":{"type":"long"},
"is_ai":{"type":"long"},
"is_own":{"type":"long"},
"is_online":{"type":"keyword"},//上线
"is_deleted":{"type":"keyword"},
"near_new_topic_num":{"type":"long","store": true}
......
......@@ -13,6 +13,8 @@
"user_id":{"type":"long"},
"group_id":{"type":"long"}, //所在组ID
"tag_list":{"type":"long"},//标签属性
"edit_tag_list":{"type":"long"},//编辑标签
"tag_name_list":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"share_num":{"type":"long"},
"pick_id_list":{"type":"long"},
......
......@@ -45,6 +45,9 @@ class Tag(models.Model):
name = models.CharField(verbose_name=u"标签名称",max_length=128)
description = models.TextField(verbose_name=u"标签描述")
icon_url=models.CharField(verbose_name=u"icon_url",max_length=120)
collection = models.IntegerField(verbose_name=u"是否编辑")
is_ai = models.IntegerField(verbose_name=u"是否ai")
is_own = models.IntegerField(verbose_name=u"是否ins上自带")
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
......
......@@ -96,15 +96,19 @@ class Topic(models.Model):
def get_topic_tag_id_list(self):
try:
topic_tag_id_list = list()
edit_tag_id_list = list()
query_results = TopicTag.objects.filter(topic_id=self.id)
for item in query_results:
is_collection = Tag.objects.filter(is_online=True,id=item.tag_id,collection=1)
if is_collection:
edit_tag_id_list.append(item.tag_id)
topic_tag_id_list.append(item.tag_id)
return topic_tag_id_list
return (topic_tag_id_list, edit_tag_id_list)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
return ([],[])
def get_tag_name_list(self, tag_id_list):
try:
......
......@@ -57,7 +57,9 @@ class TagTransfer(object):
tag_type_list.append(tag_type_id)
res["tag_type"] = tag_type_list
res["collection"] = instance.collection
res["is_ai"] = instance.is_ai
res["is_own"] = instance.is_own
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -34,7 +34,11 @@ class TopicTransfer(object):
res["share_num"] = instance.share_num
res["pick_id_list"] = instance.get_pick_id_info()
res["tag_list"] = instance.get_topic_tag_id_list()
(topic_tag_id_list, edit_tag_id_list) = instance.get_topic_tag_id_list()
res["tag_list"] = topic_tag_id_list
res["edit_tag_list"] = edit_tag_id_list
res["tag_name_list"] = instance.get_tag_name_list(res["tag_list"])
res["offline_score"] = instance.get_topic_offline_score()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment