Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
2daf15e5
Commit
2daf15e5
authored
Sep 05, 2019
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
修改rerank文件每次写入的数量
parent
2372f9e4
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
801 additions
and
9 deletions
+801
-9
rerank_esmm.py
eda/esmm/Model_pipline/rerank_esmm.py
+1
-1
master_content.py
master_content.py
+791
-0
monitor.py
monitor.py
+9
-8
No files found.
eda/esmm/Model_pipline/rerank_esmm.py
View file @
2daf15e5
...
...
@@ -184,7 +184,7 @@ if __name__ == "__main__":
print
(
len
(
users_list
))
name_tag
=
get_searchworlds_to_tagid
()
n
=
10
00
n
=
5
00
split_users_list
=
[
users_list
[
i
:
i
+
n
]
for
i
in
range
(
0
,
len
(
users_list
),
n
)]
for
child_users_list
in
split_users_list
:
total_samples
=
list
()
...
...
master_content.py
0 → 100644
View file @
2daf15e5
from
itertools
import
chain
,
islice
,
cycle
import
datetime
from
collections
import
Counter
from
gm_types.gaia
import
DIARY_ORDER_TYPE
from
gm_types.doris
import
ANSWER_SORT_TYPE
from
gm_types.doris
import
ARTICLE_SORT_TYPE
from
gm_types.mimas
import
CONTENT_CLASS
from
gm_types.doris
import
CARD_TYPE
from
gm_types.gaia
import
CITY_LEVEL
from
gm_rpcd.all
import
bind
import
traceback
from
search.utils.diary
import
recall_diary
from
search.utils.answer
import
recall_answers
from
search.utils.article
import
recall_articles
from
gm_rpcd.all
import
context
from
libs.algorithms
import
drop_dup
from
libs.cache
import
redis_client
from
libs.error
import
logging_exception
from
extend.models.gaia
import
City
,
CityScale
from
extend.models.gold
import
(
QAQueue
,
WikiQueue
,
IconQueue
,
UserTopicQueue
,
DoctorTopicQueue
,
DiaryQueue
,
ArticleQueue
,
AnswerQueue
,
DeviceQAQueue
,
DeviceIconQueue
,
DeviceUserTopicQueue
,
DeviceDoctorTopicQueue
,
DeviceAnswerQueue
,
DeviceArticleQueue
,
DeviceDiaryQueue
,
QuestionQueue
,
DeviceQuestionQueue
)
import
logging
import
redis
import
json
from
django.conf
import
settings
import
traceback
MAX_LOAD
=
200
logger
=
logging
.
getLogger
(
__name__
)
@bind
(
"doris/recommend/get_diaries"
)
def
get_diaries
(
tags
,
city
,
offset
=
0
,
size
=
10
,
city_tag_id
=
None
):
# NOTE: city as city id
sort_params
=
{}
if
city_tag_id
:
sort_params
[
"user_city_tag_id"
]
=
city_tag_id
elif
city
:
try
:
x
=
City
.
objects
.
get
(
id
=
city
)
sort_params
[
"user_city_tag_id"
]
=
x
.
tag_id
except
City
.
DoesNotExist
:
pass
filters
=
{
"is_sink"
:
False
,
"has_before_cover"
:
True
,
"has_after_cover"
:
True
,
"content_level_is_good"
:
True
}
if
tags
:
filters
[
"closure_tag_ids"
]
=
tags
tail
=
offset
+
size
diaries_ids
=
[]
if
tail
<
MAX_LOAD
:
diaries
=
recall_diary
(
None
,
0
,
200
,
filters
,
DIARY_ORDER_TYPE
.
RECOMMEND
,
sort_params
,
fields
=
[
"id"
,
"user.id"
])
diaries_items
=
[(
diary
[
'id'
],
diary
[
'user'
][
'id'
])
for
diary
in
diaries
]
drop_dup_diaries
=
drop_dup
(
diaries_items
)
drop_dup_size
=
len
(
drop_dup_diaries
)
if
tail
<=
drop_dup_size
:
diaries_ids
=
[
item
[
0
]
for
item
in
drop_dup_diaries
[
offset
:
tail
]]
if
len
(
diaries_ids
)
==
0
:
# 如果头200条去重结束 后面的排序不去重
diaries
=
recall_diary
(
None
,
offset
,
size
,
filters
,
DIARY_ORDER_TYPE
.
RECOMMEND
,
sort_params
,
fields
=
[
"id"
])
diaries_ids
=
[
diary
[
'id'
]
for
diary
in
diaries
]
return
{
"diaries_ids"
:
diaries_ids
}
@bind
(
"doris/recommend/get_articles"
)
def
get_articles
(
tags
,
offset
=
0
,
size
=
10
):
filters
=
{
"content_level"
:
[
CONTENT_CLASS
.
EXCELLENT
,
CONTENT_CLASS
.
FINE
]
}
if
tags
:
filters
[
"tag_ids"
]
=
tags
articles
=
recall_articles
(
None
,
offset
,
size
,
filters
,
ARTICLE_SORT_TYPE
.
RECOMMEND
,
{})
article_ids
=
[
article
[
'id'
]
for
article
in
articles
]
return
{
"article_ids"
:
article_ids
}
@bind
(
"doris/recommend/get_answers"
)
def
get_answers
(
tags
,
offset
=
0
,
size
=
10
):
filters
=
{
"content_level"
:
[
CONTENT_CLASS
.
EXCELLENT
,
CONTENT_CLASS
.
FINE
]
}
if
tags
:
filters
[
"tag_ids"
]
=
tags
tail
=
offset
+
size
answer_ids
=
[]
if
tail
<
MAX_LOAD
:
answers
=
recall_answers
(
None
,
0
,
MAX_LOAD
,
filters
,
ANSWER_SORT_TYPE
.
RECOMMEND
,
{},
fields
=
[
"id"
,
"user_id"
])
answers
=
filter
(
lambda
answer
:
"id"
in
answer
and
"user_id"
in
answer
,
answers
)
answer_items
=
[(
answer
[
"id"
],
answer
[
"user_id"
])
for
answer
in
answers
]
drop_dup_answers
=
drop_dup
(
answer_items
)
if
tail
<=
len
(
drop_dup_answers
):
answer_ids
=
[
item
[
0
]
for
item
in
drop_dup_answers
[
offset
:
tail
]]
if
len
(
answer_ids
)
==
0
:
answers
=
recall_answers
(
None
,
offset
,
size
,
filters
,
ANSWER_SORT_TYPE
.
RECOMMEND
,
{})
answer_ids
=
[
answer
[
'id'
]
for
answer
in
answers
]
return
{
"answer_ids"
:
answer_ids
}
@bind
(
'doris/recommend/icon'
)
def
fetch_icon
(
device_id
,
size
):
try
:
card_type
=
"icon"
try
:
que
=
DeviceIconQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceIconQueue
.
DoesNotExist
:
que
=
IconQueue
.
objects
.
last
()
if
not
que
:
return
{
"icon"
:
[]}
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
data
=
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
return
{
card_type
:
list
(
map
(
int
,
data
))}
except
:
logging_exception
()
return
{
"icon"
:
[]}
@bind
(
'doris/recommend/homepage_polymer'
)
def
fetch_polymer_ids
(
device_id
,
size
):
try
:
card_type
=
"polymer_ids"
try
:
que
=
DeviceIconQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceIconQueue
.
DoesNotExist
:
que
=
IconQueue
.
objects
.
last
()
if
not
que
:
return
{
"polymer_ids"
:
[]}
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
data
=
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
return
{
card_type
:
list
(
map
(
int
,
data
))}
except
:
logging_exception
()
return
{
"polymer_ids"
:
[]}
@bind
(
'doris/recommend/feed'
)
def
recommend_feed
(
device_id
,
card_type
,
city_id
,
size
):
try
:
return
RecommendFeed
.
dispatch
(
device_id
,
card_type
,
city_id
,
size
)
except
:
logging_exception
()
return
{
card_type
:
[]}
class
RecommendFeed
:
@classmethod
def
dispatch
(
cls
,
device_id
,
card_type
,
city_id
,
size
):
data
=
[]
if
card_type
==
CARD_TYPE
.
QA
:
data
=
cls
.
fetch_qa
(
device_id
,
card_type
,
size
)
elif
card_type
==
CARD_TYPE
.
ANSWER
:
data
=
cls
.
fetch_answer
(
device_id
,
card_type
,
size
)
data
=
list
(
map
(
int
,
data
))
elif
card_type
==
CARD_TYPE
.
ARTICLE
:
data
=
cls
.
fetch_article
(
device_id
,
card_type
,
size
)
data
=
list
(
map
(
int
,
data
))
elif
card_type
==
CARD_TYPE
.
QUESTION
:
data
=
cls
.
fetch_question
(
device_id
,
card_type
,
size
)
data
=
list
(
map
(
int
,
data
))
elif
card_type
==
CARD_TYPE
.
DIARY
:
data
=
cls
.
fetch_diary
(
device_id
,
card_type
,
city_id
,
size
)
elif
card_type
==
CARD_TYPE
.
USERTOPIC
:
data
=
cls
.
fetch_user_topic
(
device_id
,
card_type
,
size
)
elif
card_type
==
CARD_TYPE
.
DOCTORTOPIC
:
data
=
cls
.
fetch_doctor_topic
(
device_id
,
card_type
,
size
)
data
=
list
(
map
(
int
,
data
))
elif
card_type
==
CARD_TYPE
.
ENCYCLOPEDIA
:
data
=
cls
.
fetch_wiki
(
device_id
,
card_type
,
size
)
return
{
card_type
:
data
}
@staticmethod
def
current_date
():
return
datetime
.
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d'
)
@staticmethod
def
fetch_question
(
device_id
,
card_type
,
size
):
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
try
:
que
=
DeviceQuestionQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceQuestionQueue
.
DoesNotExist
:
que
=
QuestionQueue
.
objects
.
last
()
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
return
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
@staticmethod
def
fetch_icon
(
device_id
,
card_type
,
size
):
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
try
:
que
=
DeviceIconQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceIconQueue
.
DoesNotExist
:
que
=
IconQueue
.
objects
.
last
()
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
return
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
@staticmethod
def
fetch_wiki
(
device_id
,
card_type
,
size
):
try
:
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
que
=
WikiQueue
.
objects
.
last
()
if
not
que
:
return
[]
# que = list(filter(None, que.queue.split(',')))
que
=
json
.
loads
(
que
.
queue
)
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
return
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
except
:
logging_exception
()
return
[]
@staticmethod
def
fetch_answer
(
device_id
,
card_type
,
size
):
try
:
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
try
:
que
=
DeviceAnswerQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceAnswerQueue
.
DoesNotExist
:
que
=
AnswerQueue
.
objects
.
last
()
if
not
que
:
return
[]
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
return
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
except
:
logging_exception
()
return
[]
@staticmethod
def
fetch_qa
(
device_id
,
card_type
,
size
):
try
:
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
if
(
device_id
!=
'0'
):
search_qa_recommend_key
=
"TS:search_recommend_answer_queue:device_id:"
+
str
(
device_id
)
search_qa_recommend_list
=
list
()
search_cursor_ts
=
0
if
redis_client
.
exists
(
search_qa_recommend_key
):
search_qa_recommend_dict
=
redis_client
.
hgetall
(
search_qa_recommend_key
)
if
b
'cursor'
in
search_qa_recommend_dict
:
search_cursor_ts
=
json
.
loads
(
search_qa_recommend_dict
[
b
'cursor'
])
if
search_cursor_ts
<
10
:
search_qa_recommend_list
=
json
.
loads
(
search_qa_recommend_dict
[
b
'answer_queue'
])
if
search_cursor_ts
<
len
(
search_qa_recommend_list
):
size
=
size
-
1
try
:
que
=
DeviceQAQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceQAQueue
.
DoesNotExist
:
que
=
AnswerQueue
.
objects
.
last
()
if
not
que
:
return
[]
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
# redis_client.set(key, cursor + size, ex=24 * 60 * 60)
data
=
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
data
=
list
(
map
(
int
,
data
))
if
cursor
+
2
*
size
<
len
(
que
):
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
else
:
try
:
context
.
request_logger
.
app
(
reset_answer_queue
=
True
)
cursor
=
0
redis_client
.
set
(
key
,
cursor
,
ex
=
24
*
60
*
60
)
except
:
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
if
device_id
!=
'0'
:
if
len
(
search_qa_recommend_list
)
>
0
and
search_cursor_ts
<
len
(
search_qa_recommend_list
):
queue
=
search_qa_recommend_list
[
search_cursor_ts
:
search_cursor_ts
+
1
]
queue
.
extend
(
data
)
data
=
queue
new_search_cursor
=
search_cursor_ts
+
1
redis_client
.
hset
(
search_qa_recommend_key
,
'cursor'
,
new_search_cursor
)
redis_client
.
expire
(
search_qa_recommend_key
,
30
*
24
*
60
*
60
)
read_qa_key
=
"TS:recommend_answer_set:device_id:"
+
str
(
device_id
)
if
len
(
data
)
>
0
:
redis_client
.
sadd
(
read_qa_key
,
*
data
)
return
data
except
:
logging_exception
()
return
[]
@staticmethod
def
fetch_article
(
device_id
,
card_type
,
size
):
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
try
:
que
=
DeviceArticleQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceArticleQueue
.
DoesNotExist
:
que
=
ArticleQueue
.
objects
.
last
()
if
not
que
:
return
[]
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
return
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
@staticmethod
def
fetch_user_topic
(
device_id
,
card_type
,
size
):
try
:
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
if
(
device_id
!=
'0'
)
and
size
>=
2
:
search_topic_recommend_key
=
"TS:search_recommend_tractate_queue:device_id:"
+
str
(
device_id
)
search_topic_recommend_list
=
list
()
search_cursor_ts
=
0
if
redis_client
.
exists
(
search_topic_recommend_key
):
search_topic_recommend_dict
=
redis_client
.
hgetall
(
search_topic_recommend_key
)
if
b
'cursor'
in
search_topic_recommend_dict
:
search_cursor_ts
=
json
.
loads
(
search_topic_recommend_dict
[
b
'cursor'
])
if
search_cursor_ts
<
30
:
search_topic_recommend_list
=
json
.
loads
(
search_topic_recommend_dict
[
b
'tractate_queue'
])
if
search_cursor_ts
<
len
(
search_topic_recommend_list
):
size
=
size
-
2
try
:
que
=
DeviceUserTopicQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceUserTopicQueue
.
DoesNotExist
:
que
=
UserTopicQueue
.
objects
.
last
()
if
not
que
:
return
[]
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
data
=
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
data
=
list
(
map
(
int
,
data
))
if
cursor
+
2
*
size
<
len
(
que
):
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
else
:
try
:
context
.
request_logger
.
app
(
reset_queue
=
True
)
cursor
=
0
redis_client
.
set
(
key
,
cursor
,
ex
=
24
*
60
*
60
)
except
:
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
if
device_id
!=
'0'
and
size
>=
2
:
if
len
(
search_topic_recommend_list
)
>
0
and
search_cursor_ts
<
len
(
search_topic_recommend_list
):
queue
=
search_topic_recommend_list
[
search_cursor_ts
:
search_cursor_ts
+
2
]
queue
.
extend
(
data
)
data
=
queue
new_search_cursor
=
search_cursor_ts
+
2
redis_client
.
hset
(
search_topic_recommend_key
,
'cursor'
,
new_search_cursor
)
redis_client
.
expire
(
search_topic_recommend_key
,
30
*
24
*
60
*
60
)
read_topic_key
=
"TS:recommend_tractate_set:device_id:"
+
str
(
device_id
)
if
len
(
data
)
>
0
:
redis_client
.
sadd
(
read_topic_key
,
*
data
)
return
data
except
:
logging_exception
()
return
[]
@staticmethod
def
fetch_doctor_topic
(
device_id
,
card_type
,
size
):
try
:
key
=
'{device_id}-{card_type}-{date}'
.
format
(
device_id
=
device_id
,
card_type
=
card_type
,
date
=
RecommendFeed
.
current_date
())
try
:
que
=
DeviceDoctorTopicQueue
.
objects
.
get
(
device_id
=
device_id
)
except
DeviceDoctorTopicQueue
.
DoesNotExist
:
que
=
DoctorTopicQueue
.
objects
.
last
()
if
not
que
:
return
[]
que
=
list
(
filter
(
None
,
que
.
queue
.
split
(
','
)))
# adjust args.
cursor
=
redis_client
.
get
(
key
)
or
0
cursor
=
int
(
cursor
)
%
len
(
que
)
size
=
min
(
size
,
len
(
que
))
redis_client
.
set
(
key
,
cursor
+
size
,
ex
=
24
*
60
*
60
)
return
list
(
islice
(
cycle
(
que
),
cursor
,
cursor
+
size
))
except
:
logging_exception
()
return
[]
@classmethod
def
get_gm_kv_ins
(
cls
,
redis_ip
,
redis_port
,
redis_db
,
redis_password
=
""
):
try
:
if
len
(
redis_password
)
==
0
:
cli_ins
=
redis
.
Redis
(
host
=
redis_ip
,
port
=
redis_port
,
db
=
redis_db
,
socket_timeout
=
2
)
else
:
cli_ins
=
redis
.
Redis
(
host
=
redis_ip
,
port
=
redis_port
,
db
=
redis_db
,
password
=
redis_password
,
socket_timeout
=
2
)
cli_ins
.
ping
()
return
cli_ins
except
:
return
None
@classmethod
def
fetch_diary_queue_data
(
cls
,
city_id
,
device_id
=
None
):
local
=
list
()
nearby
=
list
()
nation
=
list
()
megacity
=
list
()
use_city_id
=
city_id
try
:
gm_kv_ins
=
None
for
gm_kv_host_item
in
settings
.
GM_KV_HOSTS
:
gm_kv_ins
=
cls
.
get_gm_kv_ins
(
redis_ip
=
gm_kv_host_item
[
"host"
],
redis_port
=
gm_kv_host_item
[
"port"
],
redis_db
=
gm_kv_host_item
[
"db"
],
redis_password
=
gm_kv_host_item
[
"password"
])
if
gm_kv_ins
:
break
specify_city_id_key
=
"diary_queue:city_id:"
+
use_city_id
world_city_id_key
=
"diary_queue:city_id:world"
if
device_id
is
not
None
:
specify_city_id_key
=
"device_diary_queue:device_id:"
+
device_id
+
":city_id:"
+
use_city_id
city_val_dict
=
gm_kv_ins
.
hgetall
(
specify_city_id_key
)
if
len
(
city_val_dict
)
==
0
:
city_val_dict
=
gm_kv_ins
.
hgetall
(
world_city_id_key
)
use_city_id
=
"world"
if
b
"native_queue"
in
city_val_dict
and
city_val_dict
[
b
"native_queue"
]:
local
=
list
(
filter
(
None
,
city_val_dict
[
b
"native_queue"
]
.
split
(
b
","
)))
if
b
"nearby_queue"
in
city_val_dict
and
city_val_dict
[
b
"nearby_queue"
]:
nearby
=
list
(
filter
(
None
,
city_val_dict
[
b
"nearby_queue"
]
.
split
(
b
","
)))
if
b
"nation_queue"
in
city_val_dict
and
city_val_dict
[
b
"nation_queue"
]:
nation
=
list
(
filter
(
None
,
city_val_dict
[
b
"nation_queue"
]
.
split
(
b
","
)))
if
b
"megacity_queue"
in
city_val_dict
and
city_val_dict
[
b
"megacity_queue"
]:
megacity
=
list
(
filter
(
None
,
city_val_dict
[
b
"megacity_queue"
]
.
split
(
b
","
)))
return
(
local
,
nearby
,
nation
,
megacity
,
use_city_id
)
except
:
logging_exception
()
logger
.
error
(
"catch exception,err_log:
%
s"
%
traceback
.
format_exc
())
qs
=
DiaryQueue
.
objects
.
filter
(
city_id__in
=
[
city_id
,
'world'
])
# Assume that world queue must exist.
if
len
(
qs
)
==
1
:
obj
=
qs
[
0
]
else
:
obj
=
qs
[
0
]
if
qs
[
0
]
.
city_id
==
city_id
else
qs
[
1
]
if
obj
.
native_queue
:
local
=
list
(
filter
(
None
,
obj
.
native_queue
.
split
(
','
)))
if
obj
.
nearby_queue
:
nearby
=
list
(
filter
(
None
,
obj
.
nearby_queue
.
split
(
','
)))
if
obj
.
nation_queue
:
nation
=
list
(
filter
(
None
,
obj
.
nation_queue
.
split
(
','
)))
if
obj
.
megacity_queue
:
megacity
=
list
(
filter
(
None
,
obj
.
megacity_queue
.
split
(
','
)))
use_city_id
=
obj
.
city_id
if
obj
else
use_city_id
return
(
local
,
nearby
,
nation
,
megacity
,
use_city_id
)
@classmethod
def
fetch_device_diary_queue_data
(
cls
,
city_id
,
device_id
):
local
=
list
()
nearby
=
list
()
nation
=
list
()
megacity
=
list
()
use_city_id
=
city_id
try
:
gm_kv_ins
=
None
for
gm_kv_host_item
in
settings
.
GM_KV_HOSTS
:
gm_kv_ins
=
cls
.
get_gm_kv_ins
(
redis_ip
=
gm_kv_host_item
[
"host"
],
redis_port
=
gm_kv_host_item
[
"port"
],
redis_db
=
gm_kv_host_item
[
"db"
],
redis_password
=
gm_kv_host_item
[
"password"
])
if
gm_kv_ins
:
break
specify_city_id_key
=
"device_diary_queue:device_id:"
+
device_id
+
":city_id:"
+
use_city_id
city_val_dict
=
gm_kv_ins
.
hgetall
(
specify_city_id_key
)
if
b
"native_queue"
in
city_val_dict
and
city_val_dict
[
b
"native_queue"
]:
local
=
list
(
filter
(
None
,
city_val_dict
[
b
"native_queue"
]
.
split
(
b
","
)))
if
b
"nearby_queue"
in
city_val_dict
and
city_val_dict
[
b
"nearby_queue"
]:
nearby
=
list
(
filter
(
None
,
city_val_dict
[
b
"nearby_queue"
]
.
split
(
b
","
)))
if
b
"nation_queue"
in
city_val_dict
and
city_val_dict
[
b
"nation_queue"
]:
nation
=
list
(
filter
(
None
,
city_val_dict
[
b
"nation_queue"
]
.
split
(
b
","
)))
if
b
"megacity_queue"
in
city_val_dict
and
city_val_dict
[
b
"megacity_queue"
]:
megacity
=
list
(
filter
(
None
,
city_val_dict
[
b
"megacity_queue"
]
.
split
(
b
","
)))
return
(
local
,
nearby
,
nation
,
megacity
,
use_city_id
)
except
:
logging_exception
()
logger
.
error
(
"catch exception,err_log:
%
s"
%
traceback
.
format_exc
())
obj
=
DeviceDiaryQueue
.
objects
.
filter
(
device_id
=
device_id
,
city_id
=
city_id
)
.
first
()
if
obj
and
obj
.
native_queue
:
local
=
list
(
filter
(
None
,
obj
.
native_queue
.
split
(
','
)))
if
obj
and
obj
.
nearby_queue
:
nearby
=
list
(
filter
(
None
,
obj
.
nearby_queue
.
split
(
','
)))
if
obj
and
obj
.
nation_queue
:
nation
=
list
(
filter
(
None
,
obj
.
nation_queue
.
split
(
','
)))
if
obj
and
obj
.
megacity_queue
:
megacity
=
list
(
filter
(
None
,
obj
.
megacity_queue
.
split
(
','
)))
use_city_id
=
obj
.
city_id
if
obj
else
use_city_id
return
(
local
,
nearby
,
nation
,
megacity
,
use_city_id
)
@classmethod
def
fetch_diary
(
cls
,
device_id
,
card_type
,
city_id
,
size
):
# first, we fetch data from personal-queue city-queue, if not both, get data
# from world queue.
user_portrait_diary_part_list
=
list
()
click_diary_size
=
1
search_diary_size
=
4
if
device_id
!=
'0'
:
user_portrait_diary_key
=
'user_portrait_recommend_diary_queue:device_id:
%
s:
%
s'
%
(
device_id
,
datetime
.
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d'
))
if
redis_client
.
exists
(
user_portrait_diary_key
):
user_portrait_diary_dict
=
redis_client
.
hgetall
(
user_portrait_diary_key
)
user_portrait_cursor
=
str
(
user_portrait_diary_dict
[
b
'cursor'
],
encoding
=
'utf-8'
)
if
user_portrait_cursor
==
'0'
:
if
b
'len_cursor'
in
user_portrait_diary_dict
.
keys
():
user_portrait_diary_list
=
json
.
loads
(
user_portrait_diary_dict
[
b
'diary_queue'
])
len_cursor
=
str
(
user_portrait_diary_dict
[
b
'len_cursor'
],
encoding
=
'utf-8'
)
len_cursor
=
int
(
len_cursor
)
if
len
(
user_portrait_diary_list
)
-
len_cursor
>
size
:
user_portrait_diary_part_list
=
user_portrait_diary_list
[
len_cursor
:
len_cursor
+
size
]
redis_client
.
hset
(
user_portrait_diary_key
,
'len_cursor'
,
len_cursor
+
size
)
size
=
0
else
:
user_portrait_diary_list
=
json
.
loads
(
user_portrait_diary_dict
[
b
'diary_queue'
])
diary_list_len
=
len
(
user_portrait_diary_list
)
-
len_cursor
size
=
size
-
diary_list_len
user_portrait_diary_part_list
=
user_portrait_diary_list
[
len_cursor
:
len_cursor
+
diary_list_len
]
redis_client
.
hset
(
user_portrait_diary_key
,
'len_cursor'
,
len_cursor
+
diary_list_len
)
user_portrait_cursor
=
int
(
user_portrait_cursor
)
+
1
redis_client
.
hset
(
user_portrait_diary_key
,
'cursor'
,
user_portrait_cursor
)
else
:
user_portrait_diary_part_list
=
json
.
loads
(
user_portrait_diary_dict
[
b
'diary_queue'
])
size
=
size
-
len
(
user_portrait_diary_part_list
)
user_portrait_cursor
=
int
(
user_portrait_cursor
)
+
1
redis_client
.
hset
(
user_portrait_diary_key
,
'cursor'
,
user_portrait_cursor
)
try
:
# obj = DeviceDiaryQueue.objects.filter(device_id=device_id, city_id=city_id).first()
(
local
,
nearby
,
nation
,
megacity
,
city_id
)
=
cls
.
fetch_device_diary_queue_data
(
city_id
,
device_id
)
if
len
(
local
)
==
0
and
len
(
nearby
)
==
0
and
len
(
nation
)
==
0
and
len
(
megacity
)
==
0
:
(
local
,
nearby
,
nation
,
megacity
,
city_id
)
=
cls
.
fetch_diary_queue_data
(
city_id
)
# if not obj:
# (local, nearby, nation, megacity,city_id) = cls.fetch_diary_queue_data(city_id)
# else:
# local = list(filter(None, obj.native_queue.split(','))) if obj.native_queue else []
# nearby = list(filter(None, obj.nearby_queue.split(','))) if obj.nearby_queue else []
# nation = list(filter(None, obj.nation_queue.split(','))) if obj.nation_queue else []
# megacity = list(filter(None, obj.megacity_queue.split(','))) if obj.megacity_queue else []
except
:
logging_exception
()
(
local
,
nearby
,
nation
,
megacity
,
city_id
)
=
cls
.
fetch_diary_queue_data
(
city_id
)
if
(
device_id
!=
'0'
):
search_diary_recommend_key
=
"TS:search_recommend_diary_queue:device_id:"
+
str
(
device_id
)
search_diary_recommend_list
=
list
()
search_cursor_ts
=
0
if
redis_client
.
exists
(
search_diary_recommend_key
)
and
size
>
3
:
search_diary_recommend_dict
=
redis_client
.
hgetall
(
search_diary_recommend_key
)
if
b
'cursor'
in
search_diary_recommend_dict
:
search_cursor_ts
=
json
.
loads
(
search_diary_recommend_dict
[
b
'cursor'
])
search_diary_recommend_list
=
json
.
loads
(
search_diary_recommend_dict
[
b
'diary_queue'
])
if
search_cursor_ts
+
search_diary_size
<
len
(
search_diary_recommend_list
)
:
size
=
size
-
search_diary_size
if
(
device_id
!=
'0'
)
:
diary_recommend_key
=
"TS:recommend_diary_queue:device_id:"
+
str
(
device_id
)
diary_recommend_list
=
list
()
if
redis_client
.
exists
(
diary_recommend_key
)
and
size
>
0
:
diary_recommend_dict
=
redis_client
.
hgetall
(
diary_recommend_key
)
diary_recommend_list
=
json
.
loads
(
diary_recommend_dict
[
b
'diary_queue'
])
if
len
(
diary_recommend_list
)
>
0
:
size
=
size
-
click_diary_size
key
=
'{device_id}-{city_id}-{date}'
.
format
(
device_id
=
device_id
,
city_id
=
city_id
,
date
=
RecommendFeed
.
current_date
())
# strategy rule: when user refresh over 30 loadings, reset native nearby nation queue cursor.
counter_key
=
key
+
'-counter_v1'
counter
=
redis_client
.
incr
(
counter_key
)
if
counter
==
1
:
redis_client
.
expire
(
counter_key
,
24
*
60
*
60
)
cursor_key
=
key
+
'-cursor_v1'
cursor
=
redis_client
.
get
(
cursor_key
)
or
b
'0-0-0-0'
# if counter > 30:
# cursor = b'0-0-0-0'
# redis_client.delete(counter_key)
cx
,
cy
,
cm
,
cz
=
map
(
int
,
cursor
.
split
(
b
'-'
))
x
,
y
,
m
,
z
=
cls
.
get_city_scale
(
city_id
)
data
,
ncx
,
ncy
,
ncm
,
ncz
=
cls
.
get_scale_data
(
local
,
nearby
,
nation
,
megacity
,
cx
,
cy
,
cm
,
cz
,
x
,
y
,
z
,
m
,
size
)
if
ncx
==
cx
and
ncy
==
cy
:
# native queue and nearby queue
logger
.
info
(
"diary queue reach end,cx:
%
d,cy:
%
d,cm:
%
d,cz:
%
d"
,
cx
,
cy
,
cm
,
cz
)
# redis_client.delete(counter_key)
# data, ncx, ncy, ncm, ncz = cls.get_scale_data(
# local, nearby, nation, megacity,
# 0, 0, 0, 0,
# x, y, z, m, size
# )
ncx
=
ncy
=
ncm
=
ncz
=
0
val
=
'-'
.
join
(
map
(
str
,
[
ncx
,
ncy
,
ncm
,
ncz
]))
redis_client
.
set
(
cursor_key
,
val
,
ex
=
24
*
60
*
60
)
data
=
list
(
map
(
int
,
data
))
if
device_id
!=
'0'
:
if
search_cursor_ts
<
len
(
search_diary_recommend_list
)
-
search_diary_size
:
queue
=
search_diary_recommend_list
[
search_cursor_ts
:
search_cursor_ts
+
search_diary_size
]
queue
.
extend
(
data
)
data
=
queue
new_search_cursor
=
search_cursor_ts
+
search_diary_size
redis_client
.
hset
(
search_diary_recommend_key
,
'cursor'
,
new_search_cursor
)
redis_client
.
expire
(
search_diary_recommend_key
,
30
*
24
*
60
*
60
)
if
len
(
diary_recommend_list
)
>
0
:
diary_id
=
diary_recommend_list
.
pop
(
0
)
data
.
insert
(
0
,
diary_id
)
if
len
(
diary_recommend_list
)
>
0
:
diary_recommend_list_json
=
json
.
dumps
(
diary_recommend_list
)
redis_client
.
hset
(
diary_recommend_key
,
'diary_queue'
,
diary_recommend_list_json
)
redis_client
.
expire
(
diary_recommend_key
,
30
*
24
*
60
*
60
)
else
:
redis_client
.
delete
(
diary_recommend_key
)
if
len
(
user_portrait_diary_part_list
)
>
0
:
user_portrait_diary_part_list
.
extend
(
data
)
data
=
user_portrait_diary_part_list
#已读
read_diary_key
=
"TS:recommend_diary_set:device_id:"
+
str
(
device_id
)
if
len
(
data
)
>
0
:
redis_client
.
sadd
(
read_diary_key
,
*
data
)
return
data
@staticmethod
def
get_scale_data
(
local
,
nearby
,
nation
,
megacity
,
cx
,
cy
,
cm
,
cz
,
x
,
y
,
z
,
m
,
size
):
"""
:param local: local diary queue
:param nearby: nearby diary queue
:param nation: nation diary queue
:param megacity: megacity diary queue
:param cx: seen local diary offset
:param cy: seen nearby diary offset
:param cz: seen nation diary offset
:param cm: seen megacity diary offset
:param x: local diary scale factor
:param y: nearby diary scale factor
:param z: nation diary scale factor
:param m: megacity diary scale factor
:param size: nubmer of diary
:return:
"""
# 本地 临近 特大城市 全国 四个层级 都按照的是四舍五入取得方式
# 针对出现的问题,本次相应的优化是:
# 1、如果出现两个层级为零,且有剩余坑位时,则按照本地 临近 全国的优先级,先给优先级高且为零的层级一个坑位。
# 2、如果所有层级都非零,且有剩余坑位时,则优先给权重占比大的层级一个坑位。
# 3、如果只有一个层级为零,且有剩余坑位时,则优先填充权重占比大的层级一个坑位。
nx
=
int
(
round
(
x
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
ny
=
int
(
round
(
y
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nz
=
int
(
round
(
z
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nm
=
int
(
round
(
m
*
1.0
/
(
x
+
y
+
z
+
m
)
*
size
))
nxyz
=
[
nx
,
ny
,
nm
,
nz
]
xyz
=
[
x
,
y
,
m
,
z
]
counter
=
Counter
([
nx
,
ny
,
nm
,
nz
])
if
counter
[
0
]
==
2
:
nxyz
[
nxyz
.
index
(
0
)]
+=
size
-
sum
(
nxyz
)
else
:
nxyz
[
xyz
.
index
(
max
(
xyz
))]
+=
size
-
sum
(
nxyz
)
nx
,
ny
,
nm
,
nz
=
nxyz
slocal
=
local
[
cx
:
cx
+
nx
]
cx
=
min
(
cx
+
nx
,
len
(
local
))
ny
+=
(
nx
-
len
(
slocal
))
snearby
=
nearby
[
cy
:
cy
+
ny
]
cy
=
min
(
cy
+
ny
,
len
(
nearby
))
nm
+=
(
ny
-
len
(
snearby
))
smegacity
=
megacity
[
cm
:
cm
+
nm
]
cm
=
min
(
cm
+
nm
,
len
(
megacity
))
nz
+=
(
nm
-
len
(
smegacity
))
snation
=
nation
[
cz
:
cz
+
nz
]
cz
=
min
(
cz
+
nz
,
len
(
nation
))
return
chain
(
slocal
,
snearby
,
smegacity
,
snation
),
cx
,
cy
,
cm
,
cz
@staticmethod
def
get_city_scale
(
city_id
):
try
:
c
=
CityScale
.
objects
.
get
(
city_id
=
city_id
)
x
,
y
,
z
,
m
=
c
.
native
,
c
.
nearby
,
c
.
nation
,
c
.
megacity
except
CityScale
.
DoesNotExist
:
try
:
c
=
City
.
objects
.
get
(
id
=
city_id
)
if
c
.
level
in
(
CITY_LEVEL
.
SUPER
,
CITY_LEVEL
.
ONE
):
x
,
y
,
m
,
z
=
4
,
3
,
0
,
3
elif
c
.
level
==
CITY_LEVEL
.
TWO
:
x
,
y
,
m
,
z
=
3
,
3
,
0
,
3
elif
c
.
level
==
CITY_LEVEL
.
THREE
:
x
,
y
,
m
,
z
=
1
,
4
,
0
,
5
else
:
x
,
y
,
m
,
z
=
0
,
0
,
0
,
10
except
City
.
DoesNotExist
:
x
,
y
,
m
,
z
=
0
,
0
,
0
,
10
return
x
,
y
,
m
,
z
monitor.py
View file @
2daf15e5
...
...
@@ -202,16 +202,17 @@ def make_data(device_id,city_id,key_head):
print
(
r
.
hgetall
(
key
))
if
__name__
==
"__main__"
:
native
=
","
.
join
([
str
(
i
)
for
i
in
(
range
(
69
,
7
2
))])
nearby
=
","
.
join
([
str
(
i
)
for
i
in
(
range
(
73
,
7
6
))])
nation
=
","
.
join
([
str
(
i
)
for
i
in
(
range
(
77
,
8
0
))])
megacity
=
","
.
join
([
str
(
i
)
for
i
in
(
range
(
81
,
83
))])
native
=
","
.
join
([
str
(
i
)
for
i
in
(
range
(
100
,
10
2
))])
nearby
=
","
.
join
([
str
(
i
)
for
i
in
(
range
(
102
,
10
6
))])
nation
=
","
.
join
([
str
(
i
)
for
i
in
(
range
(
106
,
11
0
))])
megacity
=
","
.
join
([
str
(
i
)
for
i
in
(
range
(
110
,
118
))])
key_head
=
"device_diary_queue_rerank:device_id:"
key_head
=
"device_diary_queue:device_id:"
make_data
(
"868663038800476"
,
"beijing"
,
key_head
)
device_id
=
"868663038800476"
city_id
=
"beijing"
# key_head = "device_diary_queue:device_id:"
device_id
=
"868663038800471"
make_data
(
device_id
,
"beijing"
,
key_head
)
# device_id = "868663038800476"
city_id
=
"beijing"
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment