1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
from kafka import KafkaConsumer
import random
from libs.cache import redis_client
import logging
from linucb.views.linucb import LinUCB
import json
from trans2es.models.tag import TopicTag,Tag
from trans2es.models.topic import TopicHomeRecommend
import traceback
from django.conf import settings
from libs.es import ESPerform
from search.utils.common import *
import libs.tools as Tools
class KafkaManager(object):
consumser_obj = None
@classmethod
def get_kafka_consumer_ins(cls, topic_name=None):
if not cls.consumser_obj:
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=settings.KAFKA_BROKER_LIST)
# cls.consumser_obj.subscribe([topic_name])
return cls.consumser_obj
class CollectData(object):
def __init__(self):
self.linucb_matrix_redis_prefix = "physical:linucb:device_id:"
#废弃
self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
#推荐帖子
self.linucb_recommend_topic_id_prefix = "physical:linucb:topic_recommend:device_id:"
self.tag_topic_id_redis_prefix = "physical:tag_id:topic_id_list:"
self.click_recommend_redis_key_prefix = "physical:click_recommend:device_id:"
# 默认
self.user_feature = [0,1]
def _get_user_linucb_info(self, device_id):
try:
redis_key = self.linucb_matrix_redis_prefix + str(device_id)
# dict的key为标签ID,value为4个矩阵
redis_linucb_tag_data_dict = redis_client.hgetall(redis_key)
return redis_linucb_tag_data_dict
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return dict()
def update_recommend_tag_list(self, device_id,user_feature=None,user_id=None,click_topic_tag_list=None,new_user_click_tag_list = []):
try:
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id)
if len(redis_linucb_tag_data_dict) == 0:
recommend_tag_list = LinUCB.get_default_tag_list(user_id)
LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
else:
user_feature = user_feature if user_feature else self.user_feature
(recommend_tag_dict,recommend_tag_set) = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))
recommend_tag_list = list(recommend_tag_dict.keys())
if len(recommend_tag_list) > 0:
tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
redis_client.expire(tag_recommend_redis_key, 7*24*60*60)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND)
promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE_DB_NAME).filter(is_online=1).values_list("topic_id",flat=True)
have_read_topic_id_list.extend(promote_recommend_topic_id_list)
recommend_topic_id_list = list()
recommend_topic_id_list_dict = dict()
recommend_topic_id_list_click = list()
recommend_topic_id_list_click_dict = dict()
if click_topic_tag_list and len(click_topic_tag_list)>0:
recommend_topic_id_list_click,recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(click_topic_tag_list,
have_read_topic_id_list,size=2)
if len(recommend_topic_id_list_click) > 0:
recommend_topic_id_list.extend(recommend_topic_id_list_click)
recommend_topic_id_list_dict.update(recommend_topic_id_list_click_dict)
# have_read_topic_id_list.extend(recommend_topic_id_list_click)
# click_recommend_redis_key = self.click_recommend_redis_key_prefix + str(device_id)
# click_redis_data_dict = {
# "data": json.dumps(recommend_topic_id_list),
# "datadict":json.dumps(recommend_topic_id_list_dict),
# "cursor": 0
# }
# redis_client.hmset(click_recommend_redis_key, click_redis_data_dict)
tag_id_list = recommend_tag_list[0:100]
topic_recommend_redis_key = self.linucb_recommend_topic_id_prefix + str(device_id)
# redis_topic_data_dict = redis_client.hgetall(topic_recommend_redis_key)
# redis_topic_list = list()
# cursor = -1
# if b"data" in redis_topic_data_dict:
# redis_topic_list = json.loads(redis_topic_data_dict[b"data"]) if redis_topic_data_dict[
# b"data"] else []
# cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8"))
# if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0:
# have_read_topic_id_list.extend(redis_topic_list[:2])
if len(new_user_click_tag_list)>0:
tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(new_user_click_tag_list, have_read_topic_id_list)
else:
tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list,have_read_topic_id_list)
if len(recommend_topic_id_list)>0 or len(tag_topic_id_list)>0 or len(new_user_click_tag_list) > 0:
tag_topic_id_list = recommend_topic_id_list + tag_topic_id_list
tag_topic_dict.update(recommend_topic_id_list_dict)
redis_data_dict = {
"data": json.dumps(tag_topic_id_list),
"datadict":json.dumps(tag_topic_dict),
"cursor":0
}
redis_client.hmset(topic_recommend_redis_key,redis_data_dict)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None):
try:
user_feature = user_feature if user_feature else self.user_feature
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client)
except:
logging.error("update_user_linucb_tag_info error!")
return False
def consume_data_from_kafka(self,topic_name=None):
try:
user_feature = [1,1]
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
while True:
msg_dict = kafka_consumer_obj.poll(timeout_ms=100)
for msg_key in msg_dict:
consume_msg = msg_dict[msg_key]
for ori_msg in consume_msg:
try:
logging.info(ori_msg)
raw_val_dict = json.loads(ori_msg.value)
if "type" in raw_val_dict and \
(raw_val_dict["type"] in ("on_click_feed_topic_card","tag_zone_click_focus")):
click_topic_tag_list = list()
if "on_click_feed_topic_card" == raw_val_dict["type"]:
topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))
topic_tag_list = list(TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=topic_id,is_online=True).values_list("tag_id",flat=True))
tag_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(id__in=topic_tag_list,is_online=True,is_deleted=False).values_list("id","collection","is_ai")
for id,collection,is_ai in tag_query_results:
if collection and is_ai:
click_topic_tag_list.append(id)
logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
str(device_id), str(topic_id), str(click_topic_tag_list)))
else:
tag_name = raw_val_dict["params"]["query"]
query_type = raw_val_dict["params"]["type"]
device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
if query_type=="do":
tag_list = list(Tag.objects.using(settings.SLAVE_DB_NAME).filter(name=tag_name,is_online=True,is_deleted=False).values_list("id",flat=True))
click_topic_tag_list.extend(tag_list)
logging.info("query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % (
str(device_id), tag_name, str(click_topic_tag_list)))
logging.info("click_topic_tag_list:%s"%(str(click_topic_tag_list)))
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
for tag_id in click_topic_tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
if len(click_topic_tag_list)>0:
self.update_recommend_tag_list(device_id, user_feature, user_id,click_topic_tag_list=click_topic_tag_list)
elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["exposure_cards"],str):
exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
elif isinstance(raw_val_dict["params"]["exposure_cards"],list):
exposure_cards_list = raw_val_dict["params"]["exposure_cards"]
else:
exposure_cards_list = list()
device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
logging.warning("type msg:%s" % raw_val_dict.get("type"))
exposure_topic_id_list = list()
for item in exposure_cards_list:
if "card_id" not in item:
continue
exposure_topic_id = item["card_id"]
logging.info(
"consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id)))
if exposure_topic_id:
exposure_topic_id_list.append(exposure_topic_id)
topic_tag_id_dict = dict()
tag_list = list()
exposure_sql_query_results = TopicTag.objects.using(settings.SLAVE_DB_NAME).\
filter(topic_id__in=exposure_topic_id_list).\
values_list("topic_id","tag_id","is_online","is_collection")
# if len(exposure_sql_query_results)>0:
for topic_id,tag_id,is_online,is_collection in exposure_sql_query_results:
if is_online and is_collection == 1:
tag_list.append(tag_id)
if is_online:
tag_sql_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(
id=tag_id).values_list("id", "collection", "is_ai")
for id, collection, is_ai in tag_sql_query_results:
if (is_ai == 1) and id not in tag_list:
tag_list.append(id)
if topic_id not in topic_tag_id_dict:
topic_tag_id_dict[topic_id] = list()
topic_tag_id_dict[topic_id].append(tag_id)
is_click = 0
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % (
str(device_id), str(topic_tag_id_dict)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id)
elif "type" in raw_val_dict and "interest_choice_click_next" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["tagid_list"],str):
tagid_list = json.loads(raw_val_dict["params"]["tagid_list"])
elif isinstance(raw_val_dict["params"]["tagid_list"],list):
tagid_list = raw_val_dict["params"]["tagid_list"]
else:
tagid_list = list()
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
# if len(exposure_sql_query_results)>0:
if len(tagid_list) > 0:
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
for tag_id in tagid_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id,new_user_click_tag_list=tagid_list)
else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False