Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
d3e160fa
Commit
d3e160fa
authored
Nov 05, 2019
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
change
parent
aac4ef4b
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
1390 additions
and
38 deletions
+1390
-38
content.py
content.py
+1311
-0
es_tag.py
es_tag.py
+79
-38
No files found.
content.py
0 → 100644
View file @
d3e160fa
from
itertools
import
chain
,
islice
,
cycle
import
datetime
from
collections
import
Counter
from
gm_types.gaia
import
DIARY_ORDER_TYPE
from
gm_types.doris
import
ANSWER_SORT_TYPE
from
gm_types.doris
import
ARTICLE_SORT_TYPE
from
gm_types.mimas
import
CONTENT_CLASS
from
gm_types.doris
import
CARD_TYPE
from
gm_types.gaia
import
CITY_LEVEL
from
gm_rpcd.all
import
bind
import
traceback
from
search.utils.diary
import
recall_diary
from
search.utils.answer
import
recall_answers
from
search.utils.article
import
recall_articles
from
search.utils.service
import
recommed_service_category_device_id
from
gm_rpcd.all
import
context
from
libs.algorithms
import
drop_dup
from
libs.cache
import
redis_client
,
redis_client2
from
libs.error
import
logging_exception
import
time
from
extend.models.gaia
import
City
,
CityScale
from
extend.models.gold
import
(
QAQueue
,
WikiQueue
,
IconQueue
,
UserTopicQueue
,
DoctorTopicQueue
,
DiaryQueue
,
ArticleQueue
,
AnswerQueue
,
DeviceQAQueue
,
DeviceIconQueue
,
DeviceUserTopicQueue
,
DeviceDoctorTopicQueue
,
DeviceAnswerQueue
,
DeviceArticleQueue
,
DeviceDiaryQueue
,
QuestionQueue
,
DeviceQuestionQueue
)
import
logging
import
redis
import
json
from
django.conf
import
settings
import
traceback
from
recommend.utils.diary_portrait
import
fetch_diary_by_user_portrait
from
recommend.utils.diary_portrait
import
fetch_qa_by_user_portrait
from
recommend.utils.diary_portrait
import
fetch_topic_by_user_portrait
MAX_LOAD
=
200
logger
=
logging
.
getLogger
(
__name__
)
@bind
(
"doris/recommend/get_diaries"
)
def
get_diaries
(
tags
,
city
,
offset
=
0
,
size
=
10
,
city_tag_id
=
None
):
# NOTE: city as city id
sort_params
=
{}
if
city_tag_id
:
sort_params
[
"user_city_tag_id"
]
=
city_tag_id
elif
city
:
try
:
x
=
City
.
objects
.
get
(
id
=
city
)
sort_params
[
"user_city_tag_id"
]
=
x
.
tag_id
except
City
.
DoesNotExist
:
pass
filters
=
{
"is_sink"
:
False
,
"has_before_cover"
:
True
,
"has_after_cover"
:
True
,
"content_level_is_good"
:
True
}
if
tags
:
filters
[
"closure_tag_ids"
]
=
tags
tail
=
offset
+
size
diaries_ids
=
[]
if
tail
<
MAX_LOAD
:
diaries
=
recall_diary
(
None
,
0
,
200
,
filters
,
DIARY_ORDER_TYPE
.
RECOMMEND
,
sort_params
,
fields
=
[
"id"
,
"user.id"
])
diaries_items
=
[(
diary
[
'id'
],
diary
[
'user'
][
'id'
])
for
diary
in
diaries
]
drop_dup_diaries
=
drop_dup
(
diaries_items
)
drop_dup_size
=
len
(
drop_dup_diaries
)
if
tail
<=
drop_dup_size
:
diaries_ids
=
[
item
[
0
]
for
item
in
drop_dup_diaries
[
offset
:
tail
]]
if
len
(
diaries_ids
)
==
0
:
# 如果头200条去重结束 后面的排序不去重
diaries
=
recall_diary
(
None
,
offset
,
size
,
filters
,
DIARY_ORDER_TYPE
.
RECOMMEND
,
sort_params
,
fields
=
[
"id"
])
diaries_ids
=
[
diary
[
'id'
]
for
diary
in
diaries
]
return
{
"diaries_ids"
:
diaries_ids
}
@bind
(
"doris/recommend/get_articles"
)
def
get_articles
(
tags
,
offset
=
0
,
size
=
10
):
filters
=
{
"content_level"
:
[
CONTENT_CLASS
.
EXCELLENT
,
CONTENT_CLASS
.
FINE
]
}
if
tags
:
filters
[
"tag_ids"
]
=
tags
articles
=
recall_articles
(
None
,
offset
,
size
,
filters
,
ARTICLE_SORT_TYPE
.
RECOMMEND
,
{})
article_ids
=
[
article
[
'id'
]
for
article
in
articles
]
return
{
"article_ids"
:
article_ids
}
@bind
(
"doris/recommend/get_answers"
)
def
get_answers
(
tags
,
offset
=
0
,
size
=
10
):
filters
=
{
"content_level"
:
[
CONTENT_CLASS
.
EXCELLENT
,
CONTENT_CLASS
.
FINE
]
}
if
tags
:
filters
[
"tag_ids"
]
=
tags
tail
=
offset
+
size
answer_ids
=
[]
if
tail
<
MAX_LOAD
:
answers
=
recall_answers
(
None
,
0
,
MAX_LOAD
,
filters
,
ANSWER_SORT_TYPE
.
RECOMMEND
,
{},
fields
=
[
"id"
,
"user_id"
])
answers
=
filter
(
lambda
answer
:
"id"
in
answer
and
"user_id"
in
answer
,
answers
)
answer_items
=
[(
answer
[
"id"
],
answer
[
"user_id"
])
for
answer
in
answers
]
drop_dup_answers
=
drop_dup
(
answer_items
)
if
tail
<=
len
(
drop_dup_answers
):
answer_ids
=
[
item
[
0
]
for
item
in
drop_dup_answers
[
offset
:
tail
]]
if
len
(
answer_ids
)
==
0
:
answers
=
recall_answers
(
None
,
offset
,
size
,
filters
,
ANSWER_SORT_TYPE
.
RECOMMEND
,
{})
answer_ids
=
[
answer
[
'id'
]
for
answer
in
answers
]
return
{
"answer_ids"
:
answer_ids
}
@bind
(
'doris/recommend/icon'
)
def
fetch_icon
(
device_id
,
size
):
try
:
card_type
=
"icon"
try
:
que
=
DeviceIconQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceIconQueue
.
DoesNotExist
:
que
=
IconQueue
.
objects
.
last
()
if
not
que
:
return
{
"icon"
:
[]}
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
data
=
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
return
{
card_type
:
list
(
map
(
int
,
data
))}
except
:
logging_exception
()
return
{
"icon"
:
[]}
@bind
(
'doris/recommend/homepage_polymer'
)
def
fetch_polymer_ids
(
device_id
,
size
):
try
:
card_type
=
"polymer_ids"
try
:
que
=
DeviceIconQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceIconQueue
.
DoesNotExist
:
que
=
IconQueue
.
objects
.
last
()
if
not
que
:
return
{
"polymer_ids"
:
[]}
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
data
=
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
return
{
card_type
:
list
(
map
(
int
,
data
))}
except
:
logging_exception
()
return
{
"polymer_ids"
:
[]}
@bind
(
'doris/recommend/feed'
)
def
recommend_feed
(
device_id
,
card_type
,
city_id
,
size
):
try
:
return
RecommendFeed
.
dispatch
(
device_id
,
card_type
,
city_id
,
size
)
except
:
logging_exception
()
return
{
card_type
:
[],
"cpc_ids"
:[]}
class
RecommendFeed
:
@classmethod
def
dispatch
(
cls
,
device_id
,
card_type
,
city_id
,
size
):
data
=
[]
cpc_ids
=
[]
time_begin
=
time
.
time
()
if
card_type
==
CARD_TYPE
.
QA
:
data
=
cls
.
fetch_qa
(
device_id
,
card_type
,
size
)
logging
.
info
(
"duan add test,fetch_qa cost:
%
f,device_id:
%
s"
%
((
time
.
time
()
-
time_begin
),
str
(
device_id
)))
elif
card_type
==
CARD_TYPE
.
ANSWER
:
data
=
cls
.
fetch_answer
(
device_id
,
card_type
,
size
)
data
=
list
(
map
(
int
,
data
))
logging
.
info
(
"duan add test,fetch_answer cost:
%
f"
%
(
time
.
time
()
-
time_begin
))
elif
card_type
==
CARD_TYPE
.
ARTICLE
:
data
=
cls
.
fetch_article
(
device_id
,
card_type
,
size
)
data
=
list
(
map
(
int
,
data
))
elif
card_type
==
CARD_TYPE
.
QUESTION
:
data
=
cls
.
fetch_question
(
device_id
,
card_type
,
size
)
data
=
list
(
map
(
int
,
data
))
elif
card_type
==
CARD_TYPE
.
DIARY
:
total
=
cls
.
fetch_diary
(
device_id
,
card_type
,
city_id
,
size
)
if
total
:
data
=
total
[
0
]
cpc_ids
=
total
[
1
]
logging
.
info
(
"duan add test,fetch_diary cost:
%
f,device_id:
%
s"
%
((
time
.
time
()
-
time_begin
),
str
(
device_id
)))
elif
card_type
==
CARD_TYPE
.
USERTOPIC
:
data
=
cls
.
fetch_user_topic
(
device_id
,
card_type
,
size
)
logging
.
info
(
"duan add test,fetch_user_topic cost:
%
f,device_id:
%
s"
%
((
time
.
time
()
-
time_begin
),
str
(
device_id
)))
elif
card_type
==
CARD_TYPE
.
DOCTORTOPIC
:
data
=
cls
.
fetch_doctor_topic
(
device_id
,
card_type
,
size
)
data
=
list
(
map
(
int
,
data
))
elif
card_type
==
CARD_TYPE
.
ENCYCLOPEDIA
:
data
=
cls
.
fetch_wiki
(
device_id
,
card_type
,
size
)
return
{
card_type
:
data
,
"cpc_ids"
:
cpc_ids
}
@staticmethod
def
current_date
():
return
datetime
.
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d'
)
@staticmethod
def
fetch_qa
(
device_id
,
card_type
,
size
):
try
:
def
filter_qa
(
device_id
,
cid_list
):
try
:
gmkv
=
None
for
gm_kv_host_item
in
settings
.
GM_KV_HOSTS
:
gmkv
=
get_gmkv
(
redis_ip
=
gm_kv_host_item
[
"host"
],
redis_port
=
gm_kv_host_item
[
"port"
],
redis_db
=
gm_kv_host_item
[
"db"
],
redis_password
=
gm_kv_host_item
[
"password"
])
if
gmkv
:
break
key
=
str
(
device_id
)
+
"_dislike_qa"
if
gmkv
.
exists
(
key
):
dislike
=
gmkv
.
smembers
(
key
)
if
len
(
cid_list
)
>
0
:
if
type
(
cid_list
[
0
])
==
int
or
type
(
cid_list
[
0
])
==
str
:
cid_list
=
[
i
for
i
in
cid_list
if
str
(
i
)
.
encode
(
'utf-8'
)
not
in
dislike
]
else
:
cid_list
=
[
i
for
i
in
cid_list
if
i
not
in
dislike
]
return
cid_list
else
:
return
cid_list
except
:
return
cid_list
def
read_history
(
cid_list
):
redis_client
.
sadd
(
today_qa_key
,
*
cid_list
)
redis_client
.
expire
(
today_qa_key
,
14
*
24
*
60
*
60
)
redis_client
.
sadd
(
read_qa_key
,
*
cid_list
)
if
redis_client
.
exists
(
old_qa_key
):
redis_client
.
sdiffstore
(
read_qa_key
,
read_qa_key
,
old_qa_key
)
redis_client
.
delete
(
old_qa_key
)
redis_client
.
expire
(
read_qa_key
,
time
=
13
*
24
*
60
*
60
)
def
get_gmkv
(
redis_ip
,
redis_port
,
redis_db
,
redis_password
=
""
):
try
:
if
len
(
redis_password
)
==
0
:
cli_ins
=
redis
.
Redis
(
host
=
redis_ip
,
port
=
redis_port
,
db
=
redis_db
,
socket_timeout
=
2
)
else
:
cli_ins
=
redis
.
Redis
(
host
=
redis_ip
,
port
=
redis_port
,
db
=
redis_db
,
password
=
redis_password
,
socket_timeout
=
2
)
cli_ins
.
ping
()
return
cli_ins
except
:
return
None
def
no_filter_qa
(
size
):
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
try
:
que
=
DeviceQAQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceQAQueue
.
DoesNotExist
:
que
=
AnswerQueue
.
objects
.
last
()
if
not
que
:
return
[]
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
if
len
(
que
)
==
0
:
return
[]
else
:
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
# redis_client.set(key, cursor + size, ex=24 * 60 * 60)
data
=
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
data
=
list
(
map
(
int
,
data
))
if
cursor
+
2
*
size
<
len
(
que
):
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
else
:
try
:
context
.
request_logger
.
app
(
reset_answer_queue
=
True
)
cursor
=
0
redis_client
.
set
(
key
,
cursor
,
ex
=
24
*
60
*
60
)
except
:
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
return
data
search_qa_recommend_list
=
list
()
read_qa_key
=
"TS:recommend_answer_set:device_id:"
+
str
(
device_id
)
old_qa_key
=
"TS:recommend_answer_set:device_id:{}:{}"
\
.
format
(
device_id
,(
datetime
.
date
.
today
()
-
datetime
.
timedelta
(
days
=
14
))
.
strftime
(
"
%
Y-
%
m-
%
d"
))
today_qa_key
=
"TS:recommend_answer_set:device_id:{}:{}"
\
.
format
(
device_id
,
datetime
.
date
.
today
()
.
strftime
(
"
%
Y-
%
m-
%
d"
))
answer_key
=
"qa_location:"
+
str
(
device_id
)
time_begin
=
time
.
time
()
if
device_id
!=
'0'
:
# if recommed_service_category_device_id(device_id):
read_list
=
[]
if
redis_client
.
exists
(
read_qa_key
):
read_list
=
[
int
(
x
)
for
x
in
list
(
redis_client
.
smembers
(
read_qa_key
))]
data
=
fetch_qa_by_user_portrait
(
device_id
,
read_list
,
size
)
if
data
:
read_history
(
data
)
if
len
(
data
)
>=
size
:
logging
.
info
(
"duan add,qa time cost:
%
f"
%
(
time
.
time
()
-
time_begin
))
return
data
else
:
size
=
size
-
len
(
data
)
search_qa_recommend_list
.
extend
(
data
)
logger
.
info
(
"portrait_fetch_qa:supplement1:device_id:{0}:size:{1}"
.
format
(
device_id
,
size
))
search_qa_recommend_key
=
"TS:search_recommend_answer_queue:device_id:"
+
str
(
device_id
)
if
redis_client
.
exists
(
search_qa_recommend_key
):
search_qa_recommend_dict
=
redis_client
.
hgetall
(
search_qa_recommend_key
)
queue_list
=
json
.
loads
(
search_qa_recommend_dict
[
b
'answer_queue'
])
queue_list
=
filter_qa
(
device_id
,
queue_list
)
if
len
(
queue_list
)
==
0
:
redis_client
.
delete
(
search_qa_recommend_key
)
elif
len
(
queue_list
)
==
1
:
size
=
size
-
1
search_qa_recommend_list
=
queue_list
redis_client
.
delete
(
search_qa_recommend_key
)
else
:
size
=
size
-
1
search_qa_recommend_list
.
append
(
queue_list
[
0
])
redis_client
.
hset
(
search_qa_recommend_key
,
"answer_queue"
,
json
.
dumps
(
queue_list
[
1
:]))
if
redis_client
.
exists
(
answer_key
):
if
b
"tail"
in
redis_client
.
hgetall
(
answer_key
):
if
len
(
search_qa_recommend_list
)
>
0
:
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
read_history
(
search_qa_recommend_list
)
else
:
search_qa_recommend_list
=
no_filter_qa
(
size
)
if
len
(
search_qa_recommend_list
)
>
0
:
read_history
(
search_qa_recommend_list
)
return
search_qa_recommend_list
elif
b
"location"
in
redis_client
.
hgetall
(
answer_key
):
try
:
que
=
DeviceQAQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceQAQueue
.
DoesNotExist
:
que
=
AnswerQueue
.
objects
.
last
()
if
not
que
:
if
len
(
search_qa_recommend_list
)
>
0
:
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
read_history
(
search_qa_recommend_list
)
return
search_qa_recommend_list
location
=
int
(
redis_client
.
hgetall
(
answer_key
)[
b
"location"
])
old_qa
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
after_filter_qa
=
filter_qa
(
device_id
,
old_qa
[
location
:])
if
(
location
>=
len
(
old_qa
)
-
1
)
or
(
len
(
after_filter_qa
)
==
0
):
redis_client
.
hset
(
answer_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
answer_key
,
3
*
60
*
60
)
if
len
(
search_qa_recommend_list
)
>
0
:
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
read_history
(
search_qa_recommend_list
)
else
:
search_qa_recommend_list
=
no_filter_qa
(
size
)
if
len
(
search_qa_recommend_list
)
>
0
:
read_history
(
search_qa_recommend_list
)
return
search_qa_recommend_list
elif
len
(
after_filter_qa
)
<=
size
:
search_qa_recommend_list
.
extend
(
after_filter_qa
)
redis_client
.
hset
(
answer_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
answer_key
,
3
*
60
*
60
)
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
read_history
(
search_qa_recommend_list
)
return
search_qa_recommend_list
elif
len
(
after_filter_qa
)
>
size
:
search_qa_recommend_list
.
extend
(
after_filter_qa
[:
size
])
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
read_history
(
search_qa_recommend_list
)
if
after_filter_qa
[
size
]
in
old_qa
:
redis_client
.
hset
(
answer_key
,
"location"
,
str
(
old_qa
.
index
(
after_filter_qa
[
size
])))
redis_client
.
expire
(
answer_key
,
3
*
60
*
60
)
else
:
redis_client
.
hset
(
answer_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
answer_key
,
3
*
60
*
60
)
return
search_qa_recommend_list
else
:
try
:
que
=
DeviceQAQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceQAQueue
.
DoesNotExist
:
que
=
AnswerQueue
.
objects
.
last
()
if
not
que
:
if
len
(
search_qa_recommend_list
)
>
0
:
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
read_history
(
search_qa_recommend_list
)
return
search_qa_recommend_list
old_qa
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
after_filter_qa
=
filter_qa
(
device_id
,
old_qa
)
if
len
(
after_filter_qa
)
==
0
:
redis_client
.
hset
(
answer_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
answer_key
,
3
*
60
*
60
)
if
len
(
search_qa_recommend_list
)
>
0
:
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
read_history
(
search_qa_recommend_list
)
else
:
search_qa_recommend_list
=
no_filter_qa
(
size
)
if
len
(
search_qa_recommend_list
)
>
0
:
read_history
(
search_qa_recommend_list
)
return
search_qa_recommend_list
elif
len
(
after_filter_qa
)
<=
size
:
search_qa_recommend_list
.
extend
(
after_filter_qa
)
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
redis_client
.
hset
(
answer_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
answer_key
,
3
*
60
*
60
)
read_history
(
search_qa_recommend_list
)
return
search_qa_recommend_list
else
:
search_qa_recommend_list
.
extend
(
after_filter_qa
[:
size
])
if
after_filter_qa
[
size
]
in
old_qa
:
redis_client
.
hset
(
answer_key
,
"location"
,
str
(
old_qa
.
index
(
after_filter_qa
[
size
])))
redis_client
.
expire
(
answer_key
,
3
*
60
*
60
)
search_qa_recommend_list
=
list
(
map
(
int
,
search_qa_recommend_list
))
read_history
(
search_qa_recommend_list
)
else
:
redis_client
.
hset
(
answer_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
answer_key
,
3
*
60
*
60
)
return
search_qa_recommend_list
else
:
data
=
no_filter_qa
(
size
)
return
data
except
:
logging_exception
()
return
[]
@staticmethod
def
fetch_user_topic
(
device_id
,
card_type
,
size
):
try
:
def
filter_topic
(
cid_list
):
try
:
gmkv
=
None
for
gm_kv_host_item
in
settings
.
GM_KV_HOSTS
:
gmkv
=
get_gmkv
(
redis_ip
=
gm_kv_host_item
[
"host"
],
redis_port
=
gm_kv_host_item
[
"port"
],
redis_db
=
gm_kv_host_item
[
"db"
],
redis_password
=
gm_kv_host_item
[
"password"
])
if
gmkv
:
break
if
gmkv
.
exists
(
dislike_key
):
dislike
=
gmkv
.
smembers
(
dislike_key
)
if
len
(
cid_list
)
>
0
:
if
type
(
cid_list
[
0
])
==
int
or
type
(
cid_list
[
0
])
==
str
:
cid_list
=
[
i
for
i
in
cid_list
if
str
(
i
)
.
encode
(
'utf-8'
)
not
in
dislike
]
else
:
cid_list
=
[
i
for
i
in
cid_list
if
i
not
in
dislike
]
return
cid_list
else
:
return
cid_list
except
:
return
cid_list
def
read_history
(
cid_list
):
redis_client
.
sadd
(
today_key
,
*
cid_list
)
redis_client
.
expire
(
today_key
,
14
*
24
*
60
*
60
)
redis_client
.
sadd
(
read_key
,
*
cid_list
)
if
redis_client
.
exists
(
old_key
):
redis_client
.
sdiffstore
(
read_key
,
read_key
,
old_key
)
redis_client
.
delete
(
old_key
)
redis_client
.
expire
(
read_key
,
time
=
13
*
24
*
60
*
60
)
def
get_gmkv
(
redis_ip
,
redis_port
,
redis_db
,
redis_password
=
""
):
try
:
if
len
(
redis_password
)
==
0
:
cli_ins
=
redis
.
Redis
(
host
=
redis_ip
,
port
=
redis_port
,
db
=
redis_db
,
socket_timeout
=
2
)
else
:
cli_ins
=
redis
.
Redis
(
host
=
redis_ip
,
port
=
redis_port
,
db
=
redis_db
,
password
=
redis_password
,
socket_timeout
=
2
)
cli_ins
.
ping
()
return
cli_ins
except
:
return
None
def
no_filter_get_topic
(
size
):
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
try
:
que
=
DeviceUserTopicQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceUserTopicQueue
.
DoesNotExist
:
que
=
UserTopicQueue
.
objects
.
last
()
if
not
que
:
return
[]
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
data
=
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
data
=
list
(
map
(
int
,
data
))
if
cursor
+
2
*
size
<
len
(
que
):
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
else
:
try
:
context
.
request_logger
.
app
(
reset_queue
=
True
)
cursor
=
0
redis_client
.
set
(
key
,
cursor
,
ex
=
24
*
60
*
60
)
except
:
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
return
data
def
list_distinct
(
ids
):
news_ids
=
[]
for
id
in
ids
:
if
id
not
in
news_ids
:
news_ids
.
append
(
id
)
return
news_ids
dislike_key
=
str
(
device_id
)
+
"_dislike_tractate"
search_topic_recommend_key
=
"TS:search_recommend_tractate_queue:device_id:"
+
str
(
device_id
)
tractate_key
=
"tractate_location:"
+
str
(
device_id
)
read_key
=
"TS:recommend_tractate_set:device_id:"
+
str
(
device_id
)
old_key
=
"TS:recommend_tractate_set:device_id:{}:{}"
\
.
format
(
device_id
,
(
datetime
.
date
.
today
()
-
datetime
.
timedelta
(
days
=
14
))
.
strftime
(
"
%
Y-
%
m-
%
d"
))
today_key
=
"TS:recommend_tractate_set:device_id:{}:{}"
\
.
format
(
device_id
,
datetime
.
date
.
today
()
.
strftime
(
"
%
Y-
%
m-
%
d"
))
search_list
=
list
()
time_begin
=
time
.
time
()
if
device_id
!=
'0'
:
# if recommed_service_category_device_id(device_id):
have_read_list
=
list
()
if
redis_client
.
exists
(
read_key
):
have_read_list
=
[
int
(
i
)
for
i
in
redis_client
.
smembers
(
read_key
)]
topic_list
=
fetch_topic_by_user_portrait
(
device_id
,
have_read_list
,
size
)
if
topic_list
:
read_history
(
topic_list
)
if
len
(
topic_list
)
>=
size
:
logging
.
info
(
"duan add,user_topic time cost:
%
f"
%
(
time
.
time
()
-
time_begin
))
return
topic_list
else
:
search_list
.
extend
(
topic_list
)
size
=
size
-
len
(
topic_list
)
logger
.
info
(
"portrait_fetch_topic:supplement1:device_id:{0}:size:{1}"
.
format
(
device_id
,
size
))
if
redis_client
.
exists
(
search_topic_recommend_key
):
search_topic_recommend_dict
=
redis_client
.
hgetall
(
search_topic_recommend_key
)
search_topic_recommend_list
=
json
.
loads
(
search_topic_recommend_dict
[
b
'tractate_queue'
])
search_topic_recommend_list
=
filter_topic
(
search_topic_recommend_list
)
if
len
(
search_topic_recommend_list
)
==
0
:
redis_client
.
delete
(
search_topic_recommend_key
)
elif
len
(
search_topic_recommend_list
)
<=
2
:
search_list
.
extend
(
search_topic_recommend_list
)
size
=
size
-
len
(
search_list
)
redis_client
.
delete
(
search_topic_recommend_key
)
else
:
search_list
.
extend
(
search_topic_recommend_list
[:
2
])
size
=
size
-
2
redis_client
.
hset
(
search_topic_recommend_key
,
'tractate_queue'
,
json
.
dumps
(
search_topic_recommend_list
[
2
:]))
if
redis_client
.
exists
(
tractate_key
):
if
b
'tail'
in
redis_client
.
hgetall
(
tractate_key
):
if
len
(
search_list
)
==
0
:
data
=
no_filter_get_topic
(
size
)
search_list
.
extend
(
data
)
search_list
=
list
(
map
(
int
,
search_list
))
read_history
(
search_list
)
search_list
=
list_distinct
(
search_list
)
return
search_list
elif
b
'location'
in
redis_client
.
hgetall
(
tractate_key
):
try
:
que
=
DeviceUserTopicQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceUserTopicQueue
.
DoesNotExist
:
que
=
UserTopicQueue
.
objects
.
last
()
if
not
que
:
if
len
(
search_list
)
>
0
:
search_list
=
list
(
map
(
int
,
search_list
))
read_history
(
search_list
)
search_list
=
list_distinct
(
search_list
)
return
search_list
old_qa
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
location
=
int
(
redis_client
.
hgetall
(
tractate_key
)[
b
'location'
])
after_filter_qa
=
filter_topic
(
old_qa
[
location
:])
if
(
location
>=
len
(
old_qa
)
-
1
)
or
(
len
(
after_filter_qa
)
==
0
):
redis_client
.
hset
(
tractate_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
tractate_key
,
3
*
60
*
60
)
if
len
(
search_list
)
==
0
:
search_list
=
no_filter_get_topic
(
size
)
search_list
=
list
(
map
(
int
,
search_list
))
read_history
(
search_list
)
search_list
=
list_distinct
(
search_list
)
return
search_list
elif
len
(
after_filter_qa
)
<=
size
:
search_list
.
extend
(
after_filter_qa
)
redis_client
.
hset
(
tractate_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
tractate_key
,
3
*
60
*
60
)
search_list
=
list
(
map
(
int
,
search_list
))
read_history
(
search_list
)
search_list
=
list_distinct
(
search_list
)
return
search_list
elif
len
(
after_filter_qa
)
>
size
:
search_list
.
extend
(
after_filter_qa
[:
size
])
search_list
=
list
(
map
(
int
,
search_list
))
read_history
(
search_list
)
search_list
=
list_distinct
(
search_list
)
if
after_filter_qa
[
size
]
in
old_qa
:
redis_client
.
hset
(
tractate_key
,
"location"
,
str
(
old_qa
.
index
(
after_filter_qa
[
size
])))
redis_client
.
expire
(
tractate_key
,
3
*
60
*
60
)
else
:
redis_client
.
hset
(
tractate_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
tractate_key
,
3
*
60
*
60
)
return
search_list
else
:
try
:
que
=
DeviceUserTopicQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceUserTopicQueue
.
DoesNotExist
:
que
=
UserTopicQueue
.
objects
.
last
()
if
not
que
:
if
len
(
search_list
)
>
0
:
search_list
=
list
(
map
(
int
,
search_list
))
read_history
(
search_list
)
search_list
=
list_distinct
(
search_list
)
return
search_list
old_qa
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
after_filter_qa
=
filter_topic
(
old_qa
)
if
len
(
after_filter_qa
)
==
0
:
redis_client
.
hset
(
tractate_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
tractate_key
,
3
*
60
*
60
)
if
len
(
search_list
)
==
0
:
search_list
=
no_filter_get_topic
(
size
)
if
len
(
search_list
)
>
0
:
search_list
=
list
(
map
(
int
,
search_list
))
read_history
(
search_list
)
search_list
=
list_distinct
(
search_list
)
return
search_list
elif
len
(
after_filter_qa
)
<=
size
:
search_list
.
extend
(
after_filter_qa
)
search_list
=
list
(
map
(
int
,
search_list
))
redis_client
.
hset
(
tractate_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
tractate_key
,
3
*
60
*
60
)
read_history
(
search_list
)
search_list
=
list_distinct
(
search_list
)
return
search_list
else
:
search_list
.
extend
(
after_filter_qa
[:
size
])
if
after_filter_qa
[
size
]
in
old_qa
:
redis_client
.
hset
(
tractate_key
,
"location"
,
str
(
old_qa
.
index
(
after_filter_qa
[
size
])))
redis_client
.
expire
(
tractate_key
,
3
*
60
*
60
)
search_list
=
list
(
map
(
int
,
search_list
))
search_list
=
list_distinct
(
search_list
)
read_history
(
search_list
)
else
:
redis_client
.
hset
(
tractate_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
tractate_key
,
3
*
60
*
60
)
return
search_list
else
:
data
=
no_filter_get_topic
(
size
)
return
data
except
:
logging_exception
()
return
[]
@classmethod
def
fetch_diary_queue_data
(
cls
,
city_id
,
device_id
=
None
):
local
=
list
()
nearby
=
list
()
nation
=
list
()
megacity
=
list
()
use_city_id
=
city_id
try
:
qs
=
DiaryQueue
.
objects
.
filter
(
city_id__in
=
[
city_id
,
'world'
])
# Assume that world queue must exist.
if
len
(
qs
)
==
1
:
obj
=
qs
[
0
]
else
:
obj
=
qs
[
0
]
if
qs
[
0
]
.
city_id
==
city_id
else
qs
[
1
]
if
obj
.
native_queue
:
local
=
list
(
filter
(
None
,
obj
.
native_queue
.
split
(
','
)))
if
obj
.
nearby_queue
:
nearby
=
list
(
filter
(
None
,
obj
.
nearby_queue
.
split
(
','
)))
if
obj
.
nation_queue
:
nation
=
list
(
filter
(
None
,
obj
.
nation_queue
.
split
(
','
)))
if
obj
.
megacity_queue
:
megacity
=
list
(
filter
(
None
,
obj
.
megacity_queue
.
split
(
','
)))
use_city_id
=
obj
.
city_id
if
obj
else
use_city_id
return
(
local
,
nearby
,
nation
,
megacity
,
use_city_id
)
except
:
logging_exception
()
logger
.
error
(
"catch exception,err_log:
%
s"
%
traceback
.
format_exc
())
return
[],[],[],[],
use_city_id
# gm_kv_ins = None
# for gm_kv_host_item in settings.GM_KV_HOSTS:
# gm_kv_ins = cls.get_gm_kv_ins(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"], redis_db=gm_kv_host_item["db"],redis_password=gm_kv_host_item["password"])
# if gm_kv_ins:
# break
#
# specify_city_id_key = "diary_queue:city_id:" + use_city_id
# world_city_id_key = "diary_queue:city_id:world"
# if device_id is not None:
# specify_city_id_key = "device_diary_queue:device_id:" + device_id + ":city_id:" + use_city_id
#
# city_val_dict = gm_kv_ins.hgetall(specify_city_id_key)
# if len(city_val_dict) == 0:
# city_val_dict = gm_kv_ins.hgetall(world_city_id_key)
# use_city_id = "world"
#
# if b"native_queue" in city_val_dict and city_val_dict[b"native_queue"]:
# local = list(filter(None, city_val_dict[b"native_queue"].split(b",")))
# if b"nearby_queue" in city_val_dict and city_val_dict[b"nearby_queue"]:
# nearby = list(filter(None, city_val_dict[b"nearby_queue"].split(b",")))
# if b"nation_queue" in city_val_dict and city_val_dict[b"nation_queue"]:
# nation = list(filter(None, city_val_dict[b"nation_queue"].split(b",")))
# if b"megacity_queue" in city_val_dict and city_val_dict[b"megacity_queue"]:
# megacity = list(filter(None, city_val_dict[b"megacity_queue"].split(b",")))
#
# return (local, nearby, nation, megacity, use_city_id)
#
# except:
@classmethod
def
fetch_diary_queue_from_redis
(
cls
,
city_id
):
local
=
list
()
nearby
=
list
()
nation
=
list
()
megacity
=
list
()
use_city_id
=
city_id
try
:
specify_city_id_key
=
"diary_queue:city_id:"
+
use_city_id
world_city_id_key
=
"diary_queue:city_id:world"
city_val_dict
=
redis_client2
.
hgetall
(
specify_city_id_key
)
if
len
(
city_val_dict
)
==
0
:
city_val_dict
=
redis_client2
.
hgetall
(
world_city_id_key
)
use_city_id
=
"world"
if
b
"native_queue"
in
city_val_dict
and
city_val_dict
[
b
"native_queue"
]:
local
=
list
(
filter
(
None
,
city_val_dict
[
b
"native_queue"
]
.
split
(
b
","
)))
if
b
"nearby_queue"
in
city_val_dict
and
city_val_dict
[
b
"nearby_queue"
]:
nearby
=
list
(
filter
(
None
,
city_val_dict
[
b
"nearby_queue"
]
.
split
(
b
","
)))
if
b
"nation_queue"
in
city_val_dict
and
city_val_dict
[
b
"nation_queue"
]:
nation
=
list
(
filter
(
None
,
city_val_dict
[
b
"nation_queue"
]
.
split
(
b
","
)))
if
b
"megacity_queue"
in
city_val_dict
and
city_val_dict
[
b
"megacity_queue"
]:
megacity
=
list
(
filter
(
None
,
city_val_dict
[
b
"megacity_queue"
]
.
split
(
b
","
)))
return
(
local
,
nearby
,
nation
,
megacity
,
use_city_id
)
except
:
logging_exception
()
logger
.
error
(
"catch exception,err_log:
%
s"
%
traceback
.
format_exc
())
return
[],[],[],[],
use_city_id
@classmethod
def
fetch_device_diary_queue_data
(
cls
,
city_id
,
device_id
):
local
=
list
()
nearby
=
list
()
nation
=
list
()
megacity
=
list
()
use_city_id
=
city_id
try
:
obj
=
DeviceDiaryQueue
.
objects
.
filter
(
device_id
=
device_id
,
city_id
=
city_id
)
.
first
()
if
obj
and
obj
.
native_queue
:
local
=
list
(
filter
(
None
,
obj
.
native_queue
.
split
(
','
)))
if
obj
and
obj
.
nearby_queue
:
nearby
=
list
(
filter
(
None
,
obj
.
nearby_queue
.
split
(
','
)))
if
obj
and
obj
.
nation_queue
:
nation
=
list
(
filter
(
None
,
obj
.
nation_queue
.
split
(
','
)))
if
obj
and
obj
.
megacity_queue
:
megacity
=
list
(
filter
(
None
,
obj
.
megacity_queue
.
split
(
','
)))
use_city_id
=
obj
.
city_id
if
obj
else
use_city_id
return
(
local
,
nearby
,
nation
,
megacity
,
use_city_id
)
except
:
logging_exception
()
logger
.
error
(
"catch exception,err_log:
%
s"
%
traceback
.
format_exc
())
return
[],[],[],[],
use_city_id
# gm_kv_ins = None
# for gm_kv_host_item in settings.GM_KV_HOSTS:
# gm_kv_ins = cls.get_gm_kv_ins(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"], redis_db=gm_kv_host_item["db"],redis_password=gm_kv_host_item["password"])
# if gm_kv_ins:
# break
#
# specify_city_id_key = "device_diary_queue:device_id:" + device_id + ":city_id:" + use_city_id
# city_val_dict = gm_kv_ins.hgetall(specify_city_id_key)
#
# # 判断是否命中灰度用户
# if recommed_service_category_device_id(device_id):
# if b"new_native_queue" in city_val_dict and city_val_dict[b"new_native_queue"]:
# local = list(filter(None, city_val_dict[b"new_native_queue"].split(b",")))
# else:
# if b"native_queue" in city_val_dict and city_val_dict[b"native_queue"]:
# local = list(filter(None, city_val_dict[b"native_queue"].split(b",")))
#
# if b"nearby_queue" in city_val_dict and city_val_dict[b"nearby_queue"]:
# nearby = list(filter(None, city_val_dict[b"nearby_queue"].split(b",")))
# if b"nation_queue" in city_val_dict and city_val_dict[b"nation_queue"]:
# nation = list(filter(None, city_val_dict[b"nation_queue"].split(b",")))
# if b"megacity_queue" in city_val_dict and city_val_dict[b"megacity_queue"]:
# megacity = list(filter(None, city_val_dict[b"megacity_queue"].split(b",")))
#
# return (local, nearby, nation, megacity, use_city_id)
# except:
@classmethod
def
fetch_diary
(
cls
,
device_id
,
card_type
,
city_id
,
size
):
try
:
def
read_history
(
cid_list
):
read_key
=
"TS:recommend_diary_set:device_id:"
+
str
(
device_id
)
old_key
=
"TS:recommend_diary_set:device_id:{}:{}"
\
.
format
(
device_id
,
(
datetime
.
date
.
today
()
-
datetime
.
timedelta
(
days
=
14
))
.
strftime
(
"
%
Y-
%
m-
%
d"
))
today_key
=
"TS:recommend_diary_set:device_id:{}:{}"
\
.
format
(
device_id
,
datetime
.
date
.
today
()
.
strftime
(
"
%
Y-
%
m-
%
d"
))
redis_client
.
sadd
(
today_key
,
*
cid_list
)
redis_client
.
expire
(
today_key
,
14
*
24
*
60
*
60
)
redis_client
.
sadd
(
read_key
,
*
cid_list
)
if
redis_client
.
exists
(
old_key
):
redis_client
.
sdiffstore
(
read_key
,
read_key
,
old_key
)
redis_client
.
delete
(
old_key
)
redis_client
.
expire
(
read_key
,
time
=
13
*
24
*
60
*
60
)
def
dislike_cid_filter
(
device_id
,
cid_list
):
try
:
gmkv
=
None
for
gm_kv_host_item
in
settings
.
GM_KV_HOSTS
:
gmkv
=
cls
.
get_gm_kv_ins
(
redis_ip
=
gm_kv_host_item
[
"host"
],
redis_port
=
gm_kv_host_item
[
"port"
],
redis_db
=
gm_kv_host_item
[
"db"
],
redis_password
=
gm_kv_host_item
[
"password"
])
if
gmkv
:
break
key
=
str
(
device_id
)
+
"_dislike_diary"
if
gmkv
.
exists
(
key
):
dislike
=
gmkv
.
smembers
(
key
)
if
len
(
cid_list
)
>
0
:
if
type
(
cid_list
[
0
])
==
int
or
type
(
cid_list
[
0
])
==
str
:
cid_list
=
[
i
for
i
in
cid_list
if
str
(
i
)
.
encode
(
'utf-8'
)
not
in
dislike
]
else
:
cid_list
=
[
i
for
i
in
cid_list
if
i
not
in
dislike
]
return
cid_list
except
:
return
cid_list
def
get_cids
(
location
,
cursor_list
,
n
,
cid_list
):
new_list
=
[]
if
n
==
0
:
cursor_list
.
append
(
6666
)
else
:
if
location
>=
len
(
cid_list
)
-
1
:
cursor_list
.
append
(
6666
)
else
:
local_filter
=
dislike_cid_filter
(
device_id
,
cid_list
[
location
:])
if
len
(
local_filter
)
==
0
:
cursor_list
.
append
(
6666
)
elif
len
(
local_filter
)
<=
n
:
cursor_list
.
append
(
6666
)
new_list
=
local_filter
else
:
new_list
=
local_filter
[:
n
]
cursor
=
cid_list
.
index
(
local_filter
[
n
])
cursor_list
.
append
(
cursor
)
return
new_list
,
cursor_list
def
get_data
(
local
,
nearby
,
nation
,
megacity
,
cx
,
cy
,
cm
,
cz
,
x
,
y
,
z
,
m
,
size
):
nx
=
int
(
round
(
x
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
ny
=
int
(
round
(
y
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nz
=
int
(
round
(
z
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nm
=
int
(
round
(
m
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nxyz
=
[
nx
,
ny
,
nm
,
nz
]
xyz
=
[
x
,
y
,
m
,
z
]
counter
=
Counter
([
nx
,
ny
,
nm
,
nz
])
if
counter
[
0
]
==
2
:
nxyz
[
nxyz
.
index
(
0
)]
+=
size
-
sum
(
nxyz
)
else
:
nxyz
[
xyz
.
index
(
max
(
xyz
))]
+=
size
-
sum
(
nxyz
)
nx
,
ny
,
nm
,
nz
=
nxyz
cursor_list
=
[]
slocal
,
cursor_list
=
get_cids
(
cx
,
cursor_list
,
nx
,
local
)
ny
+=
(
nx
-
len
(
slocal
))
snearby
,
cursor_list
=
get_cids
(
cy
,
cursor_list
,
ny
,
nearby
)
nm
+=
(
ny
-
len
(
snearby
))
smegacity
,
cursor_list
=
get_cids
(
cm
,
cursor_list
,
nm
,
megacity
)
nz
+=
(
nm
-
len
(
smegacity
))
snation
,
cursor_list
=
get_cids
(
cz
,
cursor_list
,
nz
,
nation
)
if
cursor_list
[
0
]
==
6666
and
cursor_list
[
1
]
==
6666
and
cursor_list
[
2
]
==
6666
and
cursor_list
[
3
]
==
6666
:
redis_client
.
hset
(
diary_key
,
"tail"
,
"1"
)
redis_client
.
expire
(
diary_key
,
6
*
60
*
60
)
else
:
redis_client
.
hset
(
diary_key
,
"location"
,
json
.
dumps
(
cursor_list
))
redis_client
.
expire
(
diary_key
,
6
*
60
*
60
)
total
=
list
()
total
.
extend
(
slocal
)
total
.
extend
(
snearby
)
total
.
extend
(
smegacity
)
total
.
extend
(
snation
)
return
total
# return chain(slocal, snearby, smegacity, snation)
time_begin
=
time
.
time
()
if
device_id
!=
'0'
:
cpc_list
=
[]
portrait_list
=
list
()
click_diary_size
=
1
search_diary_size
=
4
user_portrait_diary_key
=
'user_portrait_recommend_diary_queue:device_id:
%
s:
%
s'
%
\
(
device_id
,
datetime
.
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d'
))
if
redis_client
.
exists
(
user_portrait_diary_key
):
user_portrait_diary_dict
=
redis_client
.
hgetall
(
user_portrait_diary_key
)
if
b
'cpc_queue'
in
user_portrait_diary_dict
.
keys
():
cpc_queue
=
json
.
loads
(
user_portrait_diary_dict
[
b
'cpc_queue'
])
if
len
(
cpc_queue
)
>
size
:
cpc_list
.
extend
(
cpc_queue
[:
size
])
portrait_list
.
extend
(
cpc_queue
[:
size
])
redis_client
.
hset
(
user_portrait_diary_key
,
'cpc_queue'
,
json
.
dumps
(
cpc_queue
[
size
:]))
redis_client
.
expire
(
user_portrait_diary_key
,
24
*
60
*
60
)
portrait_list
=
list
(
map
(
int
,
portrait_list
))
read_history
(
portrait_list
)
logging
.
info
(
"duan add,diary time cost:
%
f"
%
(
time
.
time
()
-
time_begin
))
return
portrait_list
,
cpc_list
elif
len
(
cpc_queue
)
==
size
:
cpc_list
.
extend
(
cpc_queue
)
portrait_list
.
extend
(
cpc_queue
)
redis_client
.
hdel
(
user_portrait_diary_key
,
'cpc_queue'
)
redis_client
.
expire
(
user_portrait_diary_key
,
24
*
60
*
60
)
portrait_list
=
list
(
map
(
int
,
portrait_list
))
read_history
(
portrait_list
)
logging
.
info
(
"duan add,diary time cost:
%
f"
%
(
time
.
time
()
-
time_begin
))
return
portrait_list
,
cpc_list
else
:
cpc_list
.
extend
(
cpc_queue
)
portrait_list
.
extend
(
cpc_queue
)
redis_client
.
hdel
(
user_portrait_diary_key
,
'cpc_queue'
)
redis_client
.
expire
(
user_portrait_diary_key
,
24
*
60
*
60
)
size
=
size
-
len
(
cpc_list
)
# 如果不是灰度,才取diary_queue
# if not recommed_service_category_device_id(device_id):
# if b'diary_queue' in user_portrait_diary_dict.keys():
# user_portrait_diary_list = json.loads(user_portrait_diary_dict[b'diary_queue'])
# filter_user_portrait_diary_list = dislike_cid_filter(device_id,
# user_portrait_diary_list)
# if len(filter_user_portrait_diary_list) > size:
# portrait_list.extend(filter_user_portrait_diary_list[:size])
# redis_client.hset(user_portrait_diary_key, 'diary_queue',
# json.dumps(filter_user_portrait_diary_list[size:]))
# redis_client.expire(user_portrait_diary_key, 24 * 60 * 60)
# portrait_list = list(map(int, portrait_list))
# read_history(portrait_list)
# logging.info("duan add,diary time cost:%f" % (time.time() - time_begin))
# return portrait_list,cpc_list
# else:
# size = size - len(filter_user_portrait_diary_list)
# portrait_list.extend(filter_user_portrait_diary_list)
# redis_client.delete(user_portrait_diary_key)
# 用户画像召回实验
# if recommed_service_category_device_id(device_id):
if
size
>
0
:
# 城市tag id
x
=
City
.
objects
.
filter
(
id
=
city_id
)
city_tag_id
=
x
[
0
]
.
tag_id
if
x
else
-
1
# 已读
read_key
=
"TS:recommend_diary_set:device_id:"
+
str
(
device_id
)
have_read_diary_list
=
list
()
if
redis_client
.
exists
(
read_key
):
p
=
redis_client
.
smembers
(
read_key
)
have_read_diary_list
=
list
(
map
(
int
,
p
))
have_read_diary_list
.
extend
(
portrait_list
)
# 召回
diary_list
=
fetch_diary_by_user_portrait
(
device_id
,
city_tag_id
,
have_read_diary_list
,
size
)
size
=
size
-
len
(
diary_list
)
portrait_list
.
extend
(
diary_list
)
if
portrait_list
:
read_history
(
portrait_list
)
logger
.
info
(
"portrait_fetch_diary:device_id:{0}:portrait_list:{1}:cpc_list:{2}"
.
format
(
device_id
,
portrait_list
,
cpc_list
))
if
size
<=
0
:
return
portrait_list
,
cpc_list
logger
.
info
(
"portrait_fetch_diary:supplement1:device_id:{0}:size:{1}"
.
format
(
device_id
,
size
))
search_diary_recommend_key
=
"TS:search_recommend_diary_queue:device_id:"
+
str
(
device_id
)
search_list
=
list
()
if
redis_client
.
exists
(
search_diary_recommend_key
)
and
size
>
3
:
search_diary_recommend_dict
=
redis_client
.
hgetall
(
search_diary_recommend_key
)
search_diary_recommend_list
=
json
.
loads
(
search_diary_recommend_dict
[
b
'diary_queue'
])
search_diary_recommend_list
=
dislike_cid_filter
(
device_id
,
search_diary_recommend_list
)
if
len
(
search_diary_recommend_list
)
==
0
:
redis_client
.
delete
(
search_diary_recommend_key
)
elif
len
(
search_diary_recommend_list
)
<=
search_diary_size
:
search_list
.
extend
(
search_diary_recommend_list
)
size
=
size
-
len
(
search_diary_recommend_list
)
redis_client
.
delete
(
search_diary_recommend_key
)
else
:
search_list
.
extend
(
search_diary_recommend_list
[:
search_diary_size
])
size
=
size
-
search_diary_size
redis_client
.
hset
(
search_diary_recommend_key
,
'diary_queue'
,
json
.
dumps
(
search_diary_recommend_list
[
search_diary_size
:]))
redis_client
.
expire
(
search_diary_recommend_key
,
24
*
60
*
60
)
if
size
<=
0
:
portrait_list
.
extend
(
search_list
)
portrait_list
=
list
(
map
(
int
,
portrait_list
))
read_history
(
portrait_list
)
return
portrait_list
,
cpc_list
diary_recommend_key
=
"TS:recommend_diary_queue:device_id:"
+
str
(
device_id
)
ts_recommend_list
=
list
()
if
redis_client
.
exists
(
diary_recommend_key
)
and
size
>
0
:
diary_recommend_dict
=
redis_client
.
hgetall
(
diary_recommend_key
)
diary_recommend_list
=
json
.
loads
(
diary_recommend_dict
[
b
'diary_queue'
])
diary_recommend_list
=
dislike_cid_filter
(
device_id
,
diary_recommend_list
)
if
len
(
diary_recommend_list
)
==
0
:
redis_client
.
delete
(
diary_recommend_key
)
elif
len
(
diary_recommend_list
)
<=
click_diary_size
:
ts_recommend_list
=
diary_recommend_list
redis_client
.
delete
(
diary_recommend_key
)
size
=
size
-
len
(
ts_recommend_list
)
else
:
size
=
size
-
click_diary_size
ts_recommend_list
=
diary_recommend_list
[:
click_diary_size
]
diary_recommend_list_json
=
json
.
dumps
(
diary_recommend_list
[
click_diary_size
:])
redis_client
.
hset
(
diary_recommend_key
,
'diary_queue'
,
diary_recommend_list_json
)
redis_client
.
expire
(
diary_recommend_key
,
24
*
60
*
60
)
if
size
<=
0
:
portrait_list
.
extend
(
search_list
)
portrait_list
.
extend
(
ts_recommend_list
)
portrait_list
=
list
(
map
(
int
,
portrait_list
))
read_history
(
portrait_list
)
return
portrait_list
,
cpc_list
if
size
>
0
:
diary_key
=
"diary_device_city:"
+
str
(
device_id
)
+
str
(
city_id
)
portrait_list
.
extend
(
search_list
)
portrait_list
.
extend
(
ts_recommend_list
)
cx
=
0
cy
=
0
cz
=
0
cm
=
0
if
redis_client
.
exists
(
diary_key
):
if
b
"tail"
in
redis_client
.
hgetall
(
diary_key
):
if
len
(
portrait_list
)
==
0
:
portrait_list
=
cls
.
no_filter_get_diary
(
city_id
,
device_id
,
size
)
if
len
(
portrait_list
)
>
0
:
read_history
(
portrait_list
)
return
portrait_list
,
cpc_list
elif
b
"location"
in
redis_client
.
hgetall
(
diary_key
):
location_list
=
json
.
loads
(
redis_client
.
hgetall
(
diary_key
)[
b
"location"
])
cx
=
location_list
[
0
]
cy
=
location_list
[
1
]
cm
=
location_list
[
2
]
cz
=
location_list
[
3
]
(
local
,
nearby
,
nation
,
megacity
,
city_id
)
=
cls
.
fetch_diary_queue_from_redis
(
city_id
)
x
,
y
,
m
,
z
=
cls
.
get_city_scale
(
city_id
)
data
=
get_data
(
local
,
nearby
,
nation
,
megacity
,
cx
,
cy
,
cm
,
cz
,
x
,
y
,
z
,
m
,
size
)
portrait_list
.
extend
(
data
)
# if len(portrait_list) == 0:
# (local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
# portrait_list = cls.get_queue(local, nearby, nation, megacity,
# device_id, city_id, size, x, y, z, m)
portrait_list
=
list
(
map
(
int
,
portrait_list
))
if
len
(
portrait_list
)
!=
0
:
read_history
(
portrait_list
)
return
portrait_list
,
cpc_list
else
:
data
=
cls
.
no_filter_get_diary
(
city_id
,
device_id
,
size
)
return
data
,[]
except
:
logging
.
error
(
"catch exception,err_log:
%
s"
%
traceback
.
format_exc
())
logging_exception
()
return
[],[]
@classmethod
def
no_filter_get_diary
(
cls
,
city_id
,
device_id
,
size
):
# try:
# (local, nearby, nation, megacity, city_id) = cls.fetch_device_diary_queue_data(city_id, device_id)
# if len(local) == 0 and len(nearby) == 0 and len(nation) == 0 and len(megacity) == 0:
# (local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
# except:
# logging.error("catch exception,err_log:%s" % traceback.format_exc())
# logging_exception()
# (local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
(
local
,
nearby
,
nation
,
megacity
,
city_id
)
=
cls
.
fetch_diary_queue_from_redis
(
city_id
)
key
=
'{device_id}-{city_id}-{date}'
.
format
(
device_id
=
device_id
,
city_id
=
city_id
,
date
=
RecommendFeed
.
current_date
())
# strategy rule: when user refresh over 30 loadings, reset native nearby nation queue cursor.
counter_key
=
key
+
'-counter_v1'
counter
=
redis_client
.
incr
(
counter_key
)
if
counter
==
1
:
redis_client
.
expire
(
counter_key
,
24
*
60
*
60
)
cursor_key
=
key
+
'-cursor_v1'
cursor
=
redis_client
.
get
(
cursor_key
)
or
b
'0-0-0-0'
# if counter > 30:
# cursor = b'0-0-0-0'
# redis_client.delete(counter_key)
cx
,
cy
,
cm
,
cz
=
map
(
int
,
cursor
.
split
(
b
'-'
))
x
,
y
,
m
,
z
=
cls
.
get_city_scale
(
city_id
)
data
,
ncx
,
ncy
,
ncm
,
ncz
=
cls
.
get_scale_data
(
local
,
nearby
,
nation
,
megacity
,
cx
,
cy
,
cm
,
cz
,
x
,
y
,
z
,
m
,
size
)
if
ncx
==
cx
and
ncy
==
cy
:
# native queue and nearby queue
logger
.
info
(
"diary queue reach end,cx:
%
d,cy:
%
d,cm:
%
d,cz:
%
d"
,
cx
,
cy
,
cm
,
cz
)
# redis_client.delete(counter_key)
# data, ncx, ncy, ncm, ncz = cls.get_scale_data(
# local, nearby, nation, megacity,
# 0, 0, 0, 0,
# x, y, z, m, size
# )
ncx
=
ncy
=
ncm
=
ncz
=
0
val
=
'-'
.
join
(
map
(
str
,
[
ncx
,
ncy
,
ncm
,
ncz
]))
redis_client
.
set
(
cursor_key
,
val
,
ex
=
24
*
60
*
60
)
data
=
list
(
map
(
int
,
data
))
return
data
@classmethod
def
get_queue
(
cls
,
local
,
nearby
,
nation
,
megacity
,
device_id
,
city_id
,
size
,
x
,
y
,
z
,
m
):
key
=
'{device_id}-{city_id}-{date}'
.
format
(
device_id
=
device_id
,
city_id
=
city_id
,
date
=
RecommendFeed
.
current_date
())
counter_key
=
key
+
'-counter_v1'
counter
=
redis_client
.
incr
(
counter_key
)
if
counter
==
1
:
redis_client
.
expire
(
counter_key
,
24
*
60
*
60
)
cursor_key
=
key
+
'-cursor_v1'
cursor
=
redis_client
.
get
(
cursor_key
)
or
b
'0-0-0-0'
cx
,
cy
,
cm
,
cz
=
map
(
int
,
cursor
.
split
(
b
'-'
))
def
get_scale
(
local
,
nearby
,
nation
,
megacity
,
cx
,
cy
,
cm
,
cz
,
x
,
y
,
z
,
m
,
size
):
nx
=
int
(
round
(
x
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
ny
=
int
(
round
(
y
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nz
=
int
(
round
(
z
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nm
=
int
(
round
(
m
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nxyz
=
[
nx
,
ny
,
nm
,
nz
]
xyz
=
[
x
,
y
,
m
,
z
]
counter
=
Counter
([
nx
,
ny
,
nm
,
nz
])
if
counter
[
0
]
==
2
:
nxyz
[
nxyz
.
index
(
0
)]
+=
size
-
sum
(
nxyz
)
else
:
nxyz
[
xyz
.
index
(
max
(
xyz
))]
+=
size
-
sum
(
nxyz
)
nx
,
ny
,
nm
,
nz
=
nxyz
slocal
=
local
[
cx
:
cx
+
nx
]
cx
=
min
(
cx
+
nx
,
len
(
local
))
ny
+=
(
nx
-
len
(
slocal
))
snearby
=
nearby
[
cy
:
cy
+
ny
]
cy
=
min
(
cy
+
ny
,
len
(
nearby
))
nm
+=
(
ny
-
len
(
snearby
))
smegacity
=
megacity
[
cm
:
cm
+
nm
]
cm
=
min
(
cm
+
nm
,
len
(
megacity
))
nz
+=
(
nm
-
len
(
smegacity
))
snation
=
nation
[
cz
:
cz
+
nz
]
cz
=
min
(
cz
+
nz
,
len
(
nation
))
total
=
list
()
total
.
extend
(
slocal
)
total
.
extend
(
snearby
)
total
.
extend
(
smegacity
)
total
.
extend
(
snation
)
return
total
,
cx
,
cy
,
cm
,
cz
# return chain(slocal, snearby, smegacity, snation), cx, cy, cm, cz
data
,
ncx
,
ncy
,
ncm
,
ncz
=
get_scale
(
local
,
nearby
,
nation
,
megacity
,
cx
,
cy
,
cm
,
cz
,
x
,
y
,
z
,
m
,
size
)
if
ncx
==
cx
and
ncy
==
cy
:
# native queue and nearby queue
logger
.
info
(
"diary queue reach end,cx:
%
d,cy:
%
d,cm:
%
d,cz:
%
d"
,
cx
,
cy
,
cm
,
cz
)
ncx
=
ncy
=
ncm
=
ncz
=
0
val
=
'-'
.
join
(
map
(
str
,
[
ncx
,
ncy
,
ncm
,
ncz
]))
redis_client
.
set
(
cursor_key
,
val
,
ex
=
24
*
60
*
60
)
return
list
(
map
(
int
,
data
))
@staticmethod
def
get_city_scale
(
city_id
):
try
:
c
=
CityScale
.
objects
.
get
(
city_id
=
city_id
)
x
,
y
,
z
,
m
=
c
.
native
,
c
.
nearby
,
c
.
nation
,
c
.
megacity
except
CityScale
.
DoesNotExist
:
try
:
c
=
City
.
objects
.
get
(
id
=
city_id
)
if
c
.
level
in
(
CITY_LEVEL
.
SUPER
,
CITY_LEVEL
.
ONE
):
x
,
y
,
m
,
z
=
4
,
3
,
0
,
3
elif
c
.
level
==
CITY_LEVEL
.
TWO
:
x
,
y
,
m
,
z
=
3
,
3
,
0
,
3
elif
c
.
level
==
CITY_LEVEL
.
THREE
:
x
,
y
,
m
,
z
=
1
,
4
,
0
,
5
else
:
x
,
y
,
m
,
z
=
0
,
0
,
0
,
10
except
City
.
DoesNotExist
:
x
,
y
,
m
,
z
=
0
,
0
,
0
,
10
return
x
,
y
,
m
,
z
@staticmethod
def
get_scale_data
(
local
,
nearby
,
nation
,
megacity
,
cx
,
cy
,
cm
,
cz
,
x
,
y
,
z
,
m
,
size
):
"""
:param local: local diary queue
:param nearby: nearby diary queue
:param nation: nation diary queue
:param megacity: megacity diary queue
:param cx: seen local diary offset
:param cy: seen nearby diary offset
:param cz: seen nation diary offset
:param cm: seen megacity diary offset
:param x: local diary scale factor
:param y: nearby diary scale factor
:param z: nation diary scale factor
:param m: megacity diary scale factor
:param size: nubmer of diary
:return:
"""
# 本地 临近 特大城市 全国 四个层级 都按照的是四舍五入取得方式
# 针对出现的问题,本次相应的优化是:
# 1、如果出现两个层级为零,且有剩余坑位时,则按照本地 临近 全国的优先级,先给优先级高且为零的层级一个坑位。
# 2、如果所有层级都非零,且有剩余坑位时,则优先给权重占比大的层级一个坑位。
# 3、如果只有一个层级为零,且有剩余坑位时,则优先填充权重占比大的层级一个坑位。
nx
=
int
(
round
(
x
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
ny
=
int
(
round
(
y
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nz
=
int
(
round
(
z
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nm
=
int
(
round
(
m
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nxyz
=
[
nx
,
ny
,
nm
,
nz
]
xyz
=
[
x
,
y
,
m
,
z
]
counter
=
Counter
([
nx
,
ny
,
nm
,
nz
])
if
counter
[
0
]
==
2
:
nxyz
[
nxyz
.
index
(
0
)]
+=
size
-
sum
(
nxyz
)
else
:
nxyz
[
xyz
.
index
(
max
(
xyz
))]
+=
size
-
sum
(
nxyz
)
nx
,
ny
,
nm
,
nz
=
nxyz
slocal
=
local
[
cx
:
cx
+
nx
]
cx
=
min
(
cx
+
nx
,
len
(
local
))
ny
+=
(
nx
-
len
(
slocal
))
snearby
=
nearby
[
cy
:
cy
+
ny
]
cy
=
min
(
cy
+
ny
,
len
(
nearby
))
nm
+=
(
ny
-
len
(
snearby
))
smegacity
=
megacity
[
cm
:
cm
+
nm
]
cm
=
min
(
cm
+
nm
,
len
(
megacity
))
nz
+=
(
nm
-
len
(
smegacity
))
snation
=
nation
[
cz
:
cz
+
nz
]
cz
=
min
(
cz
+
nz
,
len
(
nation
))
total
=
list
()
total
.
extend
(
slocal
)
total
.
extend
(
snearby
)
total
.
extend
(
smegacity
)
total
.
extend
(
snation
)
return
total
,
cx
,
cy
,
cm
,
cz
es_tag.py
View file @
d3e160fa
...
...
@@ -56,8 +56,13 @@ def es_query(doc, body, offset, size, es=None):
from_
=
offset
,
size
=
size
)
number
=
res
[
"hits"
][
"total"
]
return
number
# number = res["hits"]["total"]
total
=
res
[
"hits"
][
"hits"
]
print
(
total
)
r
=
[]
for
i
in
total
:
r
.
append
(
i
[
'_source'
][
'id'
])
return
r
def
answer
():
...
...
@@ -884,11 +889,11 @@ def topic():
"content_level"
:
3
}
},
{
"term"
:
{
"status"
:
"3"
}
}]
{
"term"
:
{
"status"
:
"3"
}
}]
}
}
...
...
@@ -917,11 +922,11 @@ def topic():
"content_level"
:
4
}
},
{
"term"
:
{
"status"
:
"3"
}
}]
{
"term"
:
{
"status"
:
"3"
}
}]
}
}
...
...
@@ -950,11 +955,11 @@ def topic():
"content_level"
:
5
}
},
{
"term"
:
{
"status"
:
"3"
}
}]
{
"term"
:
{
"status"
:
"3"
}
}]
}
}
...
...
@@ -983,11 +988,11 @@ def topic():
"content_level"
:
3
}
},
{
"term"
:
{
"status"
:
"3"
}
},
{
"term"
:
{
"is_video"
:
True
}}]
{
"term"
:
{
"status"
:
"3"
}
},
{
"term"
:
{
"is_video"
:
True
}}]
}
}
...
...
@@ -1016,11 +1021,11 @@ def topic():
"content_level"
:
4
}
},
{
"term"
:
{
"status"
:
"3"
}
},
{
"term"
:
{
"is_video"
:
True
}}]
{
"term"
:
{
"status"
:
"3"
}
},
{
"term"
:
{
"is_video"
:
True
}}]
}
}
...
...
@@ -1049,11 +1054,11 @@ def topic():
"content_level"
:
5
}
},
{
"term"
:
{
"status"
:
"3"
}
},
{
"term"
:
{
"is_video"
:
True
}}]
{
"term"
:
{
"status"
:
"3"
}
},
{
"term"
:
{
"is_video"
:
True
}}]
}
}
...
...
@@ -1063,20 +1068,56 @@ def topic():
tmp
.
append
(
es_query
(
category
,
video_star5_q
,
0
,
1
))
total_list
.
append
(
tmp
)
# print(i)
# print(tmp)
df
=
pd
.
DataFrame
(
total_list
)
df
=
df
.
rename
(
columns
=
{
0
:
"tag"
,
1
:
"star_3"
,
2
:
"star_4"
,
3
:
"star_5"
,
4
:
"video_star_3"
,
5
:
"video_star_4"
,
6
:
"video_star_5"
})
df
.
to_csv
(
"/home/gmuser/topic.csv"
,
index
=
False
,
encoding
=
"utf_8_sig"
)
def
test_topic
():
query_operator
=
"and"
query_type
=
"cross_fields"
category
=
'tractate'
query
=
"眶隔脂肪释放术"
star3_q
=
{
"query"
:
{
"filtered"
:
{
"filter"
:
{
"bool"
:
{
"must"
:
[{
"multi_match"
:
{
"fields"
:
[
"content^1"
,
"author^1"
,
"tractate_tag_name^1"
,
"tractate_tag_name_content^1"
],
"operator"
:
query_operator
,
"type"
:
query_type
,
"query"
:
query
}
},
{
"term"
:
{
"is_online"
:
True
}
},
{
"term"
:
{
"content_level"
:
3
}
},
{
"term"
:
{
"status"
:
"3"
}
}]
}
}
}
},
"_source"
:{
"include"
:
"id"
}
}
print
(
es_query
(
category
,
star3_q
,
0
,
10
))
if
__name__
==
"__main__"
:
answer
()
topic
()
print
(
"topic"
)
question
()
test_topic
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment