Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
P
physical
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
alpha
physical
Commits
81fbb0fb
Commit
81fbb0fb
authored
Oct 15, 2019
by
xqf
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix regist and search page concern tag weigth to 10
parent
bcf0a311
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
232 additions
and
201 deletions
+232
-201
collect_data.py
linucb/views/collect_data.py
+232
-201
No files found.
linucb/views/collect_data.py
View file @
81fbb0fb
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
from
kafka
import
KafkaConsumer
import
random
from
libs.cache
import
redis_client
import
logging
from
linucb.views.linucb
import
LinUCB
import
json
from
trans2es.models.tag
import
TopicTag
,
Tag
from
trans2es.models.topic
import
TopicHomeRecommend
import
os
import
traceback
import
msgpack
from
django.conf
import
settings
from
kafka
import
KafkaConsumer
import
libs.tools
as
Tools
from
libs.cache
import
redis_client
from
libs.error
import
logging_exception
from
libs.es
import
ESPerform
from
linucb.views.linucb
import
LinUCB
from
search.utils.common
import
*
import
libs.tools
as
Tool
s
from
search.views.tag
import
get_same_tagset_id
s
from
trans2es.models.pictorial
import
CommunityPictorialHomeFeed
from
trans2es.models.portrait_stat
import
LikeDeviceTagStat
from
libs.error
import
logging_exception
import
os
from
search.views.tag
import
get_same_tagset_ids
import
msgpack
from
trans2es.models.tag
import
TopicTag
,
Tag
from
trans2es.models.topic
import
TopicHomeRecommend
def
loads_data
(
data
):
try
:
result
=
json
.
loads
(
data
)
msg
=
True
return
result
,
msg
return
result
,
msg
except
:
result
=
msgpack
.
loads
(
data
)
msg
=
False
return
result
,
msg
return
result
,
msg
class
KafkaManager
(
object
):
...
...
@@ -38,7 +38,6 @@ class KafkaManager(object):
@classmethod
def
get_kafka_consumer_ins
(
cls
,
topic_name
=
None
):
if
not
cls
.
consumser_obj
:
topic_name
=
settings
.
KAFKA_TOPIC_NAME
if
not
topic_name
else
topic_name
gm_logging_name
=
settings
.
KAFKA_GM_LOGGING_TOPIC_NAME
...
...
@@ -47,6 +46,7 @@ class KafkaManager(object):
return
cls
.
consumser_obj
class
CollectData
(
object
):
def
__init__
(
self
):
...
...
@@ -68,7 +68,6 @@ class CollectData(object):
# 默认
self
.
user_feature
=
[
0
,
1
]
def
_get_user_linucb_info
(
self
,
device_id
,
linucb_matrix_prefix
):
try
:
redis_key
=
linucb_matrix_prefix
+
str
(
device_id
)
...
...
@@ -83,7 +82,8 @@ class CollectData(object):
return
dict
()
def
update_recommend_tag_list
(
self
,
device_id
,
user_feature
=
None
,
user_id
=
None
,
click_topic_tag_list
=
None
,
new_user_click_tag_list
=
[],
linucb_matrix_prefix
=
None
,
linucb_recommend_tag_prefix
=
None
,
new_user_click_tag_list
=
[],
linucb_matrix_prefix
=
None
,
linucb_recommend_tag_prefix
=
None
,
linucb_topic_ids_prefix
=
None
,
linucb_pictorial_ids_prefix
=
None
):
try
:
redis_linucb_tag_data_dict
=
self
.
_get_user_linucb_info
(
device_id
,
linucb_matrix_prefix
)
...
...
@@ -101,13 +101,16 @@ class CollectData(object):
if
len
(
recommend_tag_list
)
>
0
:
tag_recommend_redis_key
=
linucb_recommend_tag_prefix
+
str
(
device_id
)
redis_client
.
set
(
tag_recommend_redis_key
,
json
.
dumps
(
recommend_tag_list
))
redis_client
.
expire
(
tag_recommend_redis_key
,
30
*
24
*
60
*
60
)
redis_client
.
expire
(
tag_recommend_redis_key
,
30
*
24
*
60
*
60
)
have_read_topic_id_list
=
Tools
.
get_have_read_topic_id_list
(
device_id
,
user_id
,
TopicPageType
.
HOME_RECOMMEND
)
have_read_topic_id_list
=
Tools
.
get_have_read_topic_id_list
(
device_id
,
user_id
,
TopicPageType
.
HOME_RECOMMEND
)
have_read_lin_pictorial_id_list
=
Tools
.
get_have_read_lin_pictorial_id_list
(
device_id
,
user_id
,
TopicPageType
.
HOME_RECOMMEND
)
promote_recommend_topic_id_list
=
TopicHomeRecommend
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
is_online
=
1
)
.
values_list
(
"topic_id"
,
flat
=
True
)
promote_lin_pictorial_id_list
=
CommunityPictorialHomeFeed
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
promote_recommend_topic_id_list
=
TopicHomeRecommend
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
is_online
=
1
)
.
values_list
(
"topic_id"
,
flat
=
True
)
promote_lin_pictorial_id_list
=
CommunityPictorialHomeFeed
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
is_deleted
=
0
,
is_online
=
1
)
.
values_list
(
"pictorial_id"
,
flat
=
True
)
have_read_topic_id_list
.
extend
(
promote_recommend_topic_id_list
)
have_read_lin_pictorial_id_list
.
extend
(
promote_lin_pictorial_id_list
)
...
...
@@ -118,10 +121,11 @@ class CollectData(object):
recommend_topic_id_list_click_dict
=
dict
()
recommend_lin_pictorial_id_list
=
list
()
if
click_topic_tag_list
and
len
(
click_topic_tag_list
)
>
0
:
if
click_topic_tag_list
and
len
(
click_topic_tag_list
)
>
0
:
click_topic_tag_list_same_tagset_ids
=
get_same_tagset_ids
(
click_topic_tag_list
)
recommend_topic_id_list_click
,
recommend_topic_id_list_click_dict
=
ESPerform
.
get_tag_topic_list_dict
(
click_topic_tag_list_same_tagset_ids
,
have_read_topic_id_list
,
size
=
2
)
recommend_topic_id_list_click
,
recommend_topic_id_list_click_dict
=
ESPerform
.
get_tag_topic_list_dict
(
click_topic_tag_list_same_tagset_ids
,
have_read_topic_id_list
,
size
=
2
)
if
len
(
recommend_topic_id_list_click
)
>
0
:
recommend_topic_id_list
.
extend
(
recommend_topic_id_list_click
)
recommend_topic_id_list_dict
.
update
(
recommend_topic_id_list_click_dict
)
...
...
@@ -145,35 +149,37 @@ class CollectData(object):
# b"data"] else []
# cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8"))
# if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0:
# have_read_topic_id_list.extend(redis_topic_list[:2])
if
len
(
new_user_click_tag_list
)
>
0
:
new_user_click_tag_list_same_tagset_ids
=
get_same_tagset_ids
(
new_user_click_tag_list
)
tag_topic_id_list
,
tag_topic_dict
=
ESPerform
.
get_tag_topic_list_dict
(
new_user_click_tag_list_same_tagset_ids
,
have_read_topic_id_list
)
recommend_lin_pictorial_id_list
=
ESPerform
.
get_tag_pictorial_id_list
(
new_user_click_tag_list_same_tagset_ids
,
have_read_lin_pictorial_id_list
)
if
len
(
new_user_click_tag_list
)
>
0
:
new_user_click_tag_list_same_tagset_ids
=
get_same_tagset_ids
(
new_user_click_tag_list
)
tag_topic_id_list
,
tag_topic_dict
=
ESPerform
.
get_tag_topic_list_dict
(
new_user_click_tag_list_same_tagset_ids
,
have_read_topic_id_list
)
recommend_lin_pictorial_id_list
=
ESPerform
.
get_tag_pictorial_id_list
(
new_user_click_tag_list_same_tagset_ids
,
have_read_lin_pictorial_id_list
)
else
:
tag_id_list_same_tagset_ids
=
get_same_tagset_ids
(
tag_id_list
)
tag_topic_id_list
,
tag_topic_dict
=
ESPerform
.
get_tag_topic_list_dict
(
tag_id_list_same_tagset_ids
,
have_read_topic_id_list
)
recommend_lin_pictorial_id_list
=
ESPerform
.
get_tag_pictorial_id_list
(
tag_id_list_same_tagset_ids
,
have_read_lin_pictorial_id_list
)
tag_id_list_same_tagset_ids
=
get_same_tagset_ids
(
tag_id_list
)
tag_topic_id_list
,
tag_topic_dict
=
ESPerform
.
get_tag_topic_list_dict
(
tag_id_list_same_tagset_ids
,
have_read_topic_id_list
)
recommend_lin_pictorial_id_list
=
ESPerform
.
get_tag_pictorial_id_list
(
tag_id_list_same_tagset_ids
,
have_read_lin_pictorial_id_list
)
if
len
(
recommend_topic_id_list
)
>
0
or
len
(
tag_topic_id_list
)
>
0
or
len
(
new_user_click_tag_list
)
>
0
:
if
len
(
recommend_topic_id_list
)
>
0
or
len
(
tag_topic_id_list
)
>
0
or
len
(
new_user_click_tag_list
)
>
0
:
tag_topic_id_list
=
recommend_topic_id_list
+
tag_topic_id_list
tag_topic_dict
.
update
(
recommend_topic_id_list_dict
)
redis_data_dict
=
{
"data"
:
json
.
dumps
(
tag_topic_id_list
),
"datadict"
:
json
.
dumps
(
tag_topic_dict
),
"cursor"
:
0
"datadict"
:
json
.
dumps
(
tag_topic_dict
),
"cursor"
:
0
}
redis_client
.
hmset
(
topic_recommend_redis_key
,
redis_data_dict
)
redis_client
.
hmset
(
topic_recommend_redis_key
,
redis_data_dict
)
if
len
(
recommend_lin_pictorial_id_list
)
>
0
:
pictorial_data_dict
=
{
"data"
:
json
.
dumps
(
recommend_lin_pictorial_id_list
),
"cursor"
:
0
}
redis_client
.
hmset
(
pictorial_recommend_redis_key
,
pictorial_data_dict
)
pictorial_data_dict
=
{
"data"
:
json
.
dumps
(
recommend_lin_pictorial_id_list
),
"cursor"
:
0
}
redis_client
.
hmset
(
pictorial_recommend_redis_key
,
pictorial_data_dict
)
return
True
except
:
...
...
@@ -184,19 +190,19 @@ class CollectData(object):
def
update_user_linucb_tag_info
(
self
,
reward
,
device_id
,
tag_id
,
user_feature
,
linucb_matrix_redis_prefix
):
try
:
user_feature
=
user_feature
if
user_feature
else
self
.
user_feature
return
LinUCB
.
update_linucb_info
(
user_feature
,
reward
,
tag_id
,
device_id
,
linucb_matrix_redis_prefix
,
redis_client
)
return
LinUCB
.
update_linucb_info
(
user_feature
,
reward
,
tag_id
,
device_id
,
linucb_matrix_redis_prefix
,
redis_client
)
except
:
logging_exception
()
logging
.
error
(
"update_user_linucb_tag_info error!"
)
return
False
def
transfer_old_info2ctr_feature_key
(
self
,
device_id
):
try
:
# 移植老用户的lin标签参数信息到ctr特征策略
ctr_linucb_matrix_redis_prefix_key
=
self
.
ctr_linucb_matrix_redis_prefix
+
str
(
device_id
)
linucb_matrix_redis_prefix_key
=
self
.
linucb_matrix_redis_prefix
+
str
(
device_id
)
if
redis_client
.
exists
(
ctr_linucb_matrix_redis_prefix_key
):
#如果新策略存在lin信息,则不需要移植
if
redis_client
.
exists
(
ctr_linucb_matrix_redis_prefix_key
):
#
如果新策略存在lin信息,则不需要移植
return
True
else
:
if
redis_client
.
exists
(
linucb_matrix_redis_prefix_key
):
...
...
@@ -223,7 +229,7 @@ class CollectData(object):
if
redis_client
.
exists
(
linucb_recommend_pictorial_id_prefix
):
older_device_info
=
redis_client
.
hgetall
(
linucb_recommend_pictorial_id_prefix
)
redis_client
.
hmset
(
ctr_linucb_recommend_pictorial_id_prefix
,
older_device_info
)
logging
.
info
(
"transfer_old_info2ctr_feature_key sucess:"
+
str
(
device_id
))
logging
.
info
(
"transfer_old_info2ctr_feature_key sucess:"
+
str
(
device_id
))
return
True
except
:
logging_exception
()
...
...
@@ -246,7 +252,7 @@ class CollectData(object):
logging
.
error
(
"get_device_tag_ctr error!"
)
return
0.0
def
consume_data_from_kafka
(
self
,
topic_name
=
None
):
def
consume_data_from_kafka
(
self
,
topic_name
=
None
):
try
:
user_feature
=
[
1
,
1
]
...
...
@@ -257,15 +263,16 @@ class CollectData(object):
consume_msg
=
msg_dict
[
msg_key
]
for
ori_msg
in
consume_msg
:
try
:
raw_val_dict
,
msg
=
loads_data
(
ori_msg
.
value
)
raw_val_dict
,
msg
=
loads_data
(
ori_msg
.
value
)
if
msg
:
logging
.
info
(
ori_msg
.
value
)
if
"type"
in
raw_val_dict
and
\
(
raw_val_dict
[
"type"
]
in
(
"on_click_feed_topic_card"
,
"on_click_button"
)):
(
raw_val_dict
[
"type"
]
in
(
"on_click_feed_topic_card"
,
"on_click_button"
)):
# 标签处理
click_topic_tag_list
=
list
()
device_id
=
""
if
"on_click_feed_topic_card"
==
raw_val_dict
[
"type"
]:
topic_id
=
raw_val_dict
[
"params"
][
"topic_id"
]
topic_id
=
raw_val_dict
[
"params"
][
"topic_id"
]
device_id
=
raw_val_dict
[
"device"
][
"device_id"
]
user_id
=
raw_val_dict
[
"user_id"
]
if
"user_id"
in
raw_val_dict
else
None
...
...
@@ -276,25 +283,36 @@ class CollectData(object):
# if collection and is_ai:
# click_topic_tag_list.append(id)
topic_tag_list
=
list
()
#从mysql表community_topictag中获取数据
#
从mysql表community_topictag中获取数据
click_results
=
TopicTag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
topic_id
=
topic_id
,
is_online
=
True
)
.
values_list
(
"tag_id"
,
"is_collection"
)
for
tag_id
,
is_collection
in
click_results
:
# topic_tag_list.append(tag_id)
if
is_collection
:
topic_tag_list
.
append
(
tag_id
)
#从mysql表community_tag中获取数据
#标签共有1000w
#select id from communty_tag where id in (select tag_id from community_topictag)
#关联得到100w标签
#
从mysql表community_tag中获取数据
#
标签共有1000w
#
select id from communty_tag where id in (select tag_id from community_topictag)
#
关联得到100w标签
tag_query_results
=
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
id__in
=
topic_tag_list
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
"is_ai"
)
id__in
=
topic_tag_list
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
"is_ai"
)
for
id
,
is_ai
in
tag_query_results
:
click_topic_tag_list
.
append
(
id
)
logging
.
info
(
"positive tag_list,device_id:
%
s,topic_id:
%
s,tag_list:
%
s"
%
(
str
(
device_id
),
str
(
topic_id
),
str
(
click_topic_tag_list
)))
str
(
device_id
),
str
(
topic_id
),
str
(
click_topic_tag_list
)))
# 更新不同策略的lin标签参数信息
#点击信息流卡片(问题卡片和回答卡片)对应的标签权重为1
for
tag_id
in
click_topic_tag_list
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
elif
raw_val_dict
[
"type"
]
==
"on_click_button"
and
"page_name"
in
\
raw_val_dict
[
"params"
]
and
"button_name"
in
raw_val_dict
[
"params"
]
\
and
"extra_param"
in
raw_val_dict
[
"params"
]:
...
...
@@ -303,35 +321,42 @@ class CollectData(object):
tag_name
=
raw_val_dict
[
"params"
][
"extra_param"
]
device_id
=
raw_val_dict
[
"device"
][
"device_id"
]
user_id
=
raw_val_dict
[
"user_id"
]
if
"user_id"
in
raw_val_dict
else
None
tag_list
=
list
(
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
name
=
tag_name
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
flat
=
True
))
tag_list
=
list
(
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
name
=
tag_name
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
flat
=
True
))
click_topic_tag_list
.
extend
(
tag_list
)
logging
.
info
(
"query tag attention,positive tag_list,device_id:
%
s,query_name:
%
s,tag_list:
%
s"
%
(
str
(
device_id
),
tag_name
,
str
(
click_topic_tag_list
)))
logging
.
info
(
"click_topic_tag_list:
%
s"
%
(
str
(
click_topic_tag_list
)))
logging
.
info
(
"query tag attention,positive tag_list,device_id:
%
s,query_name:
%
s,tag_list:
%
s"
%
(
str
(
device_id
),
tag_name
,
str
(
click_topic_tag_list
)))
# 更新不同策略的lin标签参数信息
# 注册页和搜索页关注的标签权重为10
for
i
in
range
[
10
]:
for
tag_id
in
click_topic_tag_list
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
logging
.
info
(
"click_topic_tag_list:
%
s"
%
(
str
(
click_topic_tag_list
)))
is_click
=
1
is_vote
=
0
# 如果点击和点赞满足一个回报即为1
reward
=
1
if
is_click
or
is_vote
else
0
# 用户处理
# 移植老用户的lin信息到ctr特征策略
self
.
transfer_old_info2ctr_feature_key
(
device_id
)
# 更新不同策略的lin标签参数信息
for
tag_id
in
click_topic_tag_list
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
if
len
(
click_topic_tag_list
)
>
0
:
if
len
(
click_topic_tag_list
)
>
0
:
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
click_topic_tag_list
=
click_topic_tag_list
,
linucb_matrix_prefix
=
self
.
linucb_matrix_redis_prefix
,
...
...
@@ -397,9 +422,9 @@ class CollectData(object):
# # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
# self.update_recommend_tag_list(device_id, user_feature, user_id)
elif
"type"
in
raw_val_dict
and
"interest_choice_click_next"
==
raw_val_dict
[
"type"
]:
if
isinstance
(
raw_val_dict
[
"params"
][
"tagid_list"
],
str
):
if
isinstance
(
raw_val_dict
[
"params"
][
"tagid_list"
],
str
):
tagid_list
=
json
.
loads
(
raw_val_dict
[
"params"
][
"tagid_list"
])
elif
isinstance
(
raw_val_dict
[
"params"
][
"tagid_list"
],
list
):
elif
isinstance
(
raw_val_dict
[
"params"
][
"tagid_list"
],
list
):
tagid_list
=
raw_val_dict
[
"params"
][
"tagid_list"
]
else
:
tagid_list
=
list
()
...
...
@@ -407,43 +432,43 @@ class CollectData(object):
user_id
=
raw_val_dict
[
"user_id"
]
if
"user_id"
in
raw_val_dict
else
None
logging
.
info
(
"interest_choice_click_next type:
%
s, device_id:
%
s, tag_ids:
%
s"
%
(
raw_val_dict
.
get
(
"type"
,
"missing type"
),
str
(
device_id
),
str
(
tagid_list
)))
raw_val_dict
.
get
(
"type"
,
"missing type"
),
str
(
device_id
),
str
(
tagid_list
)))
# if len(exposure_sql_query_results)>0:
if
len
(
tagid_list
)
>
0
:
tag_query_results
=
list
(
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
tag_query_results
=
list
(
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
id__in
=
tagid_list
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
flat
=
True
))
is_click
=
1
is_vote
=
0
reward
=
1
if
is_click
or
is_vote
else
0
# 移植老用户的lin信息到ctr特征策略
self
.
transfer_old_info2ctr_feature_key
(
device_id
)
for
tag_id
in
tag_query_results
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
new_user_click_tag_list
=
tag_query_results
,
linucb_matrix_prefix
=
self
.
linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
linucb_recommend_pictorial_id_prefix
)
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
new_user_click_tag_list
=
tag_query_results
,
linucb_matrix_prefix
=
self
.
ctr_linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
ctr_linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
ctr_linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
ctr_linucb_recommend_pictorial_id_prefix
)
is_category
=
False
)
.
values_list
(
"id"
,
flat
=
True
))
is_click
=
1
is_vote
=
0
reward
=
1
if
is_click
or
is_vote
else
0
# 移植老用户的lin信息到ctr特征策略
self
.
transfer_old_info2ctr_feature_key
(
device_id
)
for
tag_id
in
tag_query_results
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
new_user_click_tag_list
=
tag_query_results
,
linucb_matrix_prefix
=
self
.
linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
linucb_recommend_pictorial_id_prefix
)
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
new_user_click_tag_list
=
tag_query_results
,
linucb_matrix_prefix
=
self
.
ctr_linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
ctr_linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
ctr_linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
ctr_linucb_recommend_pictorial_id_prefix
)
# 用户点击个性化push进linucb
elif
"type"
in
raw_val_dict
and
raw_val_dict
[
"type"
]
==
"on_click_push"
:
# 后端已过滤,该tag_ids是帖子/榜单的编辑标签
...
...
@@ -495,96 +520,102 @@ class CollectData(object):
str
(
tagid_list
)))
# 用户点击问题清单进linucb
elif
b
'content'
in
raw_val_dict
:
data
=
json
.
loads
(
raw_val_dict
[
b
'content'
])
if
'SYS'
in
data
and
'APP'
in
data
and
'action'
in
data
[
'SYS'
]
and
data
[
'SYS'
][
'action'
]
==
"venus/community/skin_check/submit_questions"
:
device_id
=
data
[
'SYS'
][
'cl_id'
]
tagid_list
=
list
(
data
[
'APP'
]
.
get
(
'answer_tag'
,
[]))
user_id
=
data
[
'SYS'
]
.
get
(
'user_id'
,
None
)
logging
.
info
(
"skin_check topic type:
%
s, device_id:
%
s, answer_tag:
%
s"
%
(
str
(
data
[
'SYS'
][
'action'
]),
str
(
device_id
),
str
(
tagid_list
)))
if
len
(
tagid_list
)
>
0
:
tag_query_results
=
list
(
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
id__in
=
tagid_list
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
flat
=
True
))
tag_query_results_multi
=
[
i
for
i
in
tagid_list
if
i
in
tag_query_results
]
is_click
=
1
is_vote
=
0
reward
=
1
if
is_click
or
is_vote
else
0
# 移植老用户的lin信息到ctr特征策略
self
.
transfer_old_info2ctr_feature_key
(
device_id
)
for
i
in
range
(
5
):
for
tag_id
in
tag_query_results_multi
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
linucb_matrix_prefix
=
self
.
linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
linucb_recommend_pictorial_id_prefix
)
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
linucb_matrix_prefix
=
self
.
ctr_linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
ctr_linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
ctr_linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
ctr_linucb_recommend_pictorial_id_prefix
)
logging
.
info
(
"skin_check topic type:
%
s, device_id:
%
s, tag_query_results:
%
s"
%
(
str
(
data
[
'SYS'
][
'action'
]),
str
(
device_id
),
str
(
tag_query_results_multi
)))
# 品牌问卷进linucb
elif
'SYS'
in
data
and
'APP'
in
data
and
'action'
in
data
[
'SYS'
]
and
data
[
'SYS'
][
'action'
]
==
"venus/community/survey_question/submit"
:
device_id
=
data
[
'SYS'
][
'cl_id'
]
tagid_list
=
list
(
data
[
'APP'
]
.
get
(
'answer_tag'
,
[]))
user_id
=
data
[
'SYS'
]
.
get
(
'user_id'
,
None
)
logging
.
info
(
"survey_question type:
%
s, device_id:
%
s, answer_tag:
%
s"
%
(
str
(
data
[
'SYS'
][
'action'
]),
str
(
device_id
),
str
(
tagid_list
)))
if
len
(
tagid_list
)
>
0
:
tag_query_results
=
list
(
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
id__in
=
tagid_list
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
flat
=
True
))
tag_query_results_multi
=
[
i
for
i
in
tagid_list
if
i
in
tag_query_results
]
is_click
=
1
is_vote
=
0
reward
=
1
if
is_click
or
is_vote
else
0
# 移植老用户的lin信息到ctr特征策略
self
.
transfer_old_info2ctr_feature_key
(
device_id
)
for
i
in
range
(
5
):
for
tag_id
in
tag_query_results_multi
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
linucb_matrix_prefix
=
self
.
linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
linucb_recommend_pictorial_id_prefix
)
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
linucb_matrix_prefix
=
self
.
ctr_linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
ctr_linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
ctr_linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
ctr_linucb_recommend_pictorial_id_prefix
)
logging
.
info
(
"survey_question type:
%
s, device_id:
%
s, tagid_list:
%
s"
%
(
str
(
data
[
'SYS'
][
'action'
]),
str
(
device_id
),
str
(
tag_query_results_multi
)))
data
=
json
.
loads
(
raw_val_dict
[
b
'content'
])
if
'SYS'
in
data
and
'APP'
in
data
and
'action'
in
data
[
'SYS'
]
and
data
[
'SYS'
][
'action'
]
==
"venus/community/skin_check/submit_questions"
:
device_id
=
data
[
'SYS'
][
'cl_id'
]
tagid_list
=
list
(
data
[
'APP'
]
.
get
(
'answer_tag'
,
[]))
user_id
=
data
[
'SYS'
]
.
get
(
'user_id'
,
None
)
logging
.
info
(
"skin_check topic type:
%
s, device_id:
%
s, answer_tag:
%
s"
%
(
str
(
data
[
'SYS'
][
'action'
]),
str
(
device_id
),
str
(
tagid_list
)))
if
len
(
tagid_list
)
>
0
:
tag_query_results
=
list
(
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
id__in
=
tagid_list
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
flat
=
True
))
tag_query_results_multi
=
[
i
for
i
in
tagid_list
if
i
in
tag_query_results
]
is_click
=
1
is_vote
=
0
reward
=
1
if
is_click
or
is_vote
else
0
# 移植老用户的lin信息到ctr特征策略
self
.
transfer_old_info2ctr_feature_key
(
device_id
)
for
i
in
range
(
5
):
for
tag_id
in
tag_query_results_multi
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
linucb_matrix_prefix
=
self
.
linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
linucb_recommend_pictorial_id_prefix
)
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
linucb_matrix_prefix
=
self
.
ctr_linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
ctr_linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
ctr_linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
ctr_linucb_recommend_pictorial_id_prefix
)
logging
.
info
(
"skin_check topic type:
%
s, device_id:
%
s, tag_query_results:
%
s"
%
(
str
(
data
[
'SYS'
][
'action'
]),
str
(
device_id
),
str
(
tag_query_results_multi
)))
# 品牌问卷进linucb
elif
'SYS'
in
data
and
'APP'
in
data
and
'action'
in
data
[
'SYS'
]
and
data
[
'SYS'
][
'action'
]
==
"venus/community/survey_question/submit"
:
device_id
=
data
[
'SYS'
][
'cl_id'
]
tagid_list
=
list
(
data
[
'APP'
]
.
get
(
'answer_tag'
,
[]))
user_id
=
data
[
'SYS'
]
.
get
(
'user_id'
,
None
)
logging
.
info
(
"survey_question type:
%
s, device_id:
%
s, answer_tag:
%
s"
%
(
str
(
data
[
'SYS'
][
'action'
]),
str
(
device_id
),
str
(
tagid_list
)))
if
len
(
tagid_list
)
>
0
:
tag_query_results
=
list
(
Tag
.
objects
.
using
(
settings
.
SLAVE1_DB_NAME
)
.
filter
(
id__in
=
tagid_list
,
is_online
=
True
,
is_deleted
=
False
,
is_category
=
False
)
.
values_list
(
"id"
,
flat
=
True
))
tag_query_results_multi
=
[
i
for
i
in
tagid_list
if
i
in
tag_query_results
]
is_click
=
1
is_vote
=
0
reward
=
1
if
is_click
or
is_vote
else
0
# 移植老用户的lin信息到ctr特征策略
self
.
transfer_old_info2ctr_feature_key
(
device_id
)
for
i
in
range
(
5
):
for
tag_id
in
tag_query_results_multi
:
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature
,
self
.
linucb_matrix_redis_prefix
)
# 获取tag的ctr信息
device_tag_ctr
=
self
.
get_device_tag_ctr
(
device_id
,
tag_id
)
user_feature_ctr
=
[
device_tag_ctr
,
device_tag_ctr
]
self
.
update_user_linucb_tag_info
(
reward
,
device_id
,
tag_id
,
user_feature_ctr
,
self
.
ctr_linucb_matrix_redis_prefix
)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
linucb_matrix_prefix
=
self
.
linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
linucb_recommend_pictorial_id_prefix
)
self
.
update_recommend_tag_list
(
device_id
,
user_feature
,
user_id
,
linucb_matrix_prefix
=
self
.
ctr_linucb_matrix_redis_prefix
,
linucb_recommend_tag_prefix
=
self
.
ctr_linucb_recommend_redis_prefix
,
linucb_topic_ids_prefix
=
self
.
ctr_linucb_recommend_topic_id_prefix
,
linucb_pictorial_ids_prefix
=
self
.
ctr_linucb_recommend_pictorial_id_prefix
)
logging
.
info
(
"survey_question type:
%
s, device_id:
%
s, tagid_list:
%
s"
%
(
str
(
data
[
'SYS'
][
'action'
]),
str
(
device_id
),
str
(
tag_query_results_multi
)))
else
:
if
msg
:
logging
.
warning
(
"unknown type msg:
%
s"
%
raw_val_dict
.
get
(
"type"
,
"missing type"
))
logging
.
warning
(
"unknown type msg:
%
s"
%
raw_val_dict
.
get
(
"type"
,
"missing type"
))
except
:
logging_exception
()
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment