Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
af381d3d
Commit
af381d3d
authored
Sep 03, 2018
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
计算ctr
parent
6560d5e2
Show whitespace changes
Inline
Side-by-side
Showing
22 changed files
with
179 additions
and
1765 deletions
+179
-1765
config.py
config.py
+24
-12
diaryUpdateOnlineOffline.py
diaryUpdateOnlineOffline.py
+11
-14
ctr.py
local/ctr.py
+42
-0
prepareData.py
prepareData.py
+6
-4
config.py
restruct/config.py
+0
-31
dataProcess.py
restruct/dataProcess.py
+0
-102
delete_temp_files.py
restruct/delete_temp_files.py
+0
-30
diary-training.py
restruct/diary-training.py
+0
-165
diaryCandidateSet.py
restruct/diaryCandidateSet.py
+0
-96
diaryQueueUpdate.py
restruct/diaryQueueUpdate.py
+0
-302
diaryTestSet.py
restruct/diaryTestSet.py
+0
-50
diaryTraining.py
restruct/diaryTraining.py
+0
-20
diaryUpdateOnlineOffline.py
restruct/diaryUpdateOnlineOffline.py
+0
-319
keepProcess.py
restruct/keepProcess.py
+0
-21
predictDiary.py
restruct/predictDiary.py
+0
-118
prepareData.py
restruct/prepareData.py
+0
-38
train.py
restruct/train.py
+0
-30
userProfile.py
restruct/userProfile.py
+0
-111
utils.py
restruct/utils.py
+0
-260
train.py
train.py
+1
-2
userProfile.py
userProfile.py
+67
-33
utils.py
utils.py
+28
-7
No files found.
config.py
View file @
af381d3d
# 线上地址
DIRECTORY_PATH
=
'/data/models/'
# 测试日期一定要大于验证日期,因为切割数据集的代码是这样设置的
# VALIDATION_DATE = '2018-08-05'
# TEST_DATE = '2018-08-06'
# DATA_START_DATE = '2018-07-05'
# DATA_END_DATE = '2018-08-06'
# 本地地址
LOCAL_DIRCTORY
=
"/Users/mac/utils/"
# 线上用户活跃表db
ACTIVE_USER_DB_ONLINE
=
{
"host"
:
'10.66.157.22'
,
"port"
:
4000
,
"user"
:
'root'
,
"passwd"
:
'3SYz54LS9#^9sBvC'
,
"db"
:
'jerry_test'
}
#线下用户活跃表db
ACTIVE_USER_DB_LOCAL
=
{
"host"
:
'192.168.15.12'
,
"port"
:
4000
,
"user"
:
'root'
,
"db"
:
'jerry_test'
}
# 线上日记队列db
QUEUE_DB_ONLINE
=
{
"host"
:
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
"port"
:
3306
,
"user"
:
'doris'
,
"passwd"
:
'o5gbA27hXHHm'
,
"db"
:
'doris_prod'
}
# 本地日记队列db
QUEUE_DB_LOCAL
=
{
"host"
:
'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
,
"port"
:
3306
,
"user"
:
'work'
,
"passwd"
:
'workwork'
,
"db"
:
'doris_test'
}
# 线上日记打分
SCORE_DB_ONLINE
=
{
"host"
:
'10.66.157.22'
,
"port"
:
4000
,
"user"
:
'root'
,
"passwd"
:
'3SYz54LS9#^9sBvC'
,
"db"
:
'eagle'
}
# 本地日记打分db
SCORE_DB_LOCAL
=
{
"host"
:
'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
,
"port"
:
3306
,
"user"
:
'work'
,
"passwd"
:
'workwork'
,
"db"
:
'zhengxing_test'
}
MODEL_VERSION
=
''
lr
=
0.03
l2_lambda
=
0.002
...
...
@@ -18,13 +32,9 @@ ONLINE_EAGLE_HOST = '10.66.157.22'
# 测试日记视频所在的ip
LOCAL_EAGLE_HOST
=
"192.168.15.12"
# 本地地址
LOCAL_DIRCTORY
=
"/Users/mac/utils/"
# 线上日记队列域名
QUEUE_ONLINE_HOST
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
# 本地日记队列域名
LOCAL_HOST
=
'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
...
...
@@ -34,3 +44,5 @@ LOCAL_HOST = 'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
diaryUpdateOnlineOffline.py
View file @
af381d3d
...
...
@@ -13,7 +13,7 @@ from userProfile import get_active_users
from
sklearn.preprocessing
import
MinMaxScaler
import
time
from
config
import
*
import
socket
from
utils
import
judge_online
,
con_sql
def
get_video_id
(
cache_video_id
):
...
...
@@ -112,18 +112,18 @@ def save_result(queue_name,queue_arg,device_id):
def
get_score
(
queue_arg
):
if
flag
:
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
db
=
pymysql
.
connect
(
host
=
SCORE_DB_ONLINE
[
"host"
],
port
=
SCORE_DB_ONLINE
[
"port"
],
user
=
SCORE_DB_ONLINE
[
"user"
],
passwd
=
SCORE_DB_ONLINE
[
"passwd"
],
db
=
SCORE_DB_ONLINE
[
"db"
])
else
:
db
=
pymysql
.
connect
(
host
=
LOCAL_HOST
,
port
=
3306
,
user
=
'work'
,
passwd
=
'workwork'
,
db
=
'zhengxing_tes'
)
db
=
pymysql
.
connect
(
host
=
SCORE_DB_LOCAL
[
"host"
],
port
=
SCORE_DB_LOCAL
[
"port"
],
user
=
SCORE_DB_LOCAL
[
"user"
],
passwd
=
SCORE_DB_LOCAL
[
"passwd"
],
db
=
SCORE_DB_LOCAL
[
"db"
])
cursor
=
db
.
cursor
()
# 去除diary_id 前面的"diary|"
diary_list
=
tuple
(
list
(
map
(
lambda
x
:
x
[
6
:],
queue_arg
[
2
])))
sql
=
"select score,diary_id from biz_feed_diary_score where diary_id in {};"
.
format
(
diary_list
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
score_df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
score_df
=
con_sql
(
db
,
sql
)
print
(
"get score"
)
return
score_df
...
...
@@ -285,12 +285,7 @@ def multi_proecess_update(device_id, city_id, data_set_cid,total_video_id):
if
__name__
==
"__main__"
:
warnings
.
filterwarnings
(
"ignore"
)
flag
=
True
path
=
DIRECTORY_PATH
# 下面这个ip是本地电脑ip
if
socket
.
gethostbyname
(
socket
.
gethostname
())
==
'172.30.8.160'
:
flag
=
False
path
=
LOCAL_DIRCTORY
flag
,
path
=
judge_online
()
# 增加缓存日记视频列表
cache_video_id
=
[]
cache_device_city_list
=
[]
...
...
@@ -298,6 +293,8 @@ if __name__ == "__main__":
while
True
:
start
=
time
.
time
()
device_city_list
=
get_active_users
(
flag
,
path
,
differ
)
time1
=
time
.
time
()
print
(
"获取用户活跃表耗时:{}秒"
.
format
(
time1
-
start
))
# 过滤掉5分钟内预测过的用户
device_city_list
=
list
(
set
(
tuple
(
device_city_list
))
-
set
(
tuple
(
cache_device_city_list
)))
print
(
"device_city_list"
)
...
...
local/ctr.py
0 → 100644
View file @
af381d3d
import
pandas
as
pd
import
pymysql
df
=
pd
.
read_csv
(
r"/data2/models/2018-09-02predictTail6Unique.csv"
)
a
=
eval
(
df
.
loc
[
0
,
"list"
])
a
=
list
(
map
(
lambda
x
:
x
[
0
],
a
))
print
(
len
(
a
))
print
(
a
[:
2
])
cf
=
pd
.
read_csv
(
r"/data2/models/nvwa-2018-09-02predictTail6Unique.csv"
)
b
=
eval
(
cf
.
loc
[
0
,
"list"
])
print
(
len
(
b
))
print
(
b
[:
2
])
a
.
extend
(
b
)
print
(
"个数"
)
print
(
len
(
set
(
a
)))
pre_list
=
list
(
set
(
a
))
pre_list
=
list
(
map
(
lambda
x
:
"diary|"
+
x
,
pre_list
))
print
(
pre_list
[:
2
])
stat_date
=
"2018-09-02"
cid_type
=
"diary"
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select count(device_id) from data_feed_exposure2
\
where stat_date='{}'
\
and cid_type='{}'
\
and device_id in {}"
.
format
(
stat_date
,
cid_type
,
pre_list
)
cursor
=
db
.
cursor
()
print
(
"开始获取"
)
cursor
.
execute
(
sql
)
print
(
"成功获取"
)
click
=
cursor
.
fetchall
()[
0
][
0
]
print
(
click
)
sql
=
"select count(device_id) from data_feed_exposure2
\
where stat_date='{}'
\
and cid_type='{}'
\
and device_id in {}"
.
format
(
stat_date
,
cid_type
,
pre_list
)
cursor
=
db
.
cursor
()
print
(
"开始获取"
)
cursor
.
execute
(
sql
)
exp
=
cursor
.
fetchall
()[
0
][
0
]
print
(
exp
)
print
(
click
/
exp
)
prepareData.py
View file @
af381d3d
from
utils
import
con_sql
import
datetime
import
time
import
pymysql
def
fetch_data
(
start_date
,
end_date
):
# 获取点击表里的device_id
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct device_id from data_feed_click2"
click_device_id
=
con_sql
(
sql
)[
0
]
.
values
.
tolist
()
click_device_id
=
con_sql
(
db
,
sql
)[
0
]
.
values
.
tolist
()
print
(
"成功获取点击表里的device_id"
)
# 获取点击表里的数据
sql
=
"select cid,device_id,time,stat_date from data_feed_click2 "
\
"where stat_date >= '{0}' and stat_date <= '{1}'"
.
format
(
start_date
,
end_date
)
click
=
con_sql
(
sql
)
click
=
con_sql
(
db
,
sql
)
click
=
click
.
rename
(
columns
=
{
0
:
"cid"
,
1
:
"device_id"
,
2
:
"time_date"
,
3
:
"stat_date"
})
print
(
"成功获取点击表里的数据"
)
# 从time特征中抽取hour
...
...
@@ -25,7 +27,7 @@ def fetch_data(start_date, end_date):
sql
=
"select cid,device_id,time,stat_date from data_feed_exposure2 "
\
"where stat_date >= '{0}' and stat_date <= '{1}'"
.
format
(
start_date
,
end_date
)
start
=
time
.
time
()
exposure
=
con_sql
(
sql
)
exposure
=
con_sql
(
db
,
sql
)
end
=
time
.
time
()
print
(
"获取曝光表耗时{}分"
.
format
((
end
-
start
)
/
60
))
exposure
=
exposure
.
rename
(
columns
=
{
0
:
"cid"
,
1
:
"device_id"
,
2
:
"time_date"
,
3
:
"stat_date"
})
...
...
restruct/config.py
deleted
100644 → 0
View file @
6560d5e2
DIRECTORY_PATH
=
'/data/models/'
# 测试日期一定要大于验证日期,因为切割数据集的代码是这样设置的
# VALIDATION_DATE = '2018-08-05'
# TEST_DATE = '2018-08-06'
# DATA_START_DATE = '2018-07-05'
# DATA_END_DATE = '2018-08-06'
MODEL_VERSION
=
''
lr
=
0.03
l2_lambda
=
0.002
#线上日记视频对应的ip
ONLINE_EAGLE_HOST
=
'10.66.157.22'
# 测试日记视频数据库连接参数
VIDEO_DB
=
{
"host"
:
"192.168.15.12"
,
"port"
:
4000
,
"user"
:
"root"
,
"db"
:
'eagle'
}
# 本地地址
LOCAL_DIRCTORY
=
"/Users/mac/utils/"
# 线上日记队列域名
QUEUE_ONLINE_HOST
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
# 本地日记队列域名
LOCAL_HOST
=
'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
restruct/dataProcess.py
deleted
100644 → 0
View file @
6560d5e2
import
time
from
prepareData
import
fetch_data
from
utils
import
*
import
pandas
as
pd
from
config
import
*
import
pickle
def
feature_en
(
data_start_date
,
data_end_date
,
validation_date
,
test_date
):
exposure
,
click
,
click_device_id
=
fetch_data
(
data_start_date
,
data_end_date
)
# 求曝光表和点击表的差集合
print
(
"曝光表处理前的样本个数"
)
print
(
exposure
.
shape
)
exposure
=
exposure
.
append
(
click
)
exposure
=
exposure
.
append
(
click
)
subset
=
click
.
columns
.
tolist
()
exposure
=
exposure
.
drop_duplicates
(
subset
=
subset
,
keep
=
False
)
print
(
"差集后曝光表个数"
)
print
(
exposure
.
shape
)
exposure
=
exposure
.
loc
[
exposure
[
"device_id"
]
.
isin
(
click_device_id
)]
print
(
"去除未点击用户后曝光表个数"
)
print
(
exposure
.
shape
)
# 打标签
click
[
"y"
]
=
1
exposure
[
"y"
]
=
0
print
(
"正样本个数"
)
print
(
click
.
shape
[
0
])
print
(
"负样本个数"
)
print
(
exposure
.
shape
[
0
])
# 合并点击表和曝光表
data
=
click
.
append
(
exposure
)
print
(
"点击表和曝光表合并成功"
)
data
=
data
.
sort_values
(
by
=
"stat_date"
,
ascending
=
False
)
test_number
=
data
[
data
[
"stat_date"
]
==
test_date
]
.
shape
[
0
]
validation_number
=
data
[
data
[
"stat_date"
]
==
validation_date
]
.
shape
[
0
]
data
=
data
.
drop
(
"stat_date"
,
axis
=
1
)
# 数值是0的特征会被ffm格式删除,经过下面的处理后,没有数值是0的特征
data
.
loc
[
data
[
"hour"
]
==
0
,
[
"hour"
]]
=
24
data
.
loc
[
data
[
"minute"
]
==
0
,
[
"minute"
]]
=
60
data
[
"hour"
]
=
data
[
"hour"
]
.
astype
(
"category"
)
data
[
"minute"
]
=
data
[
"minute"
]
.
astype
(
"category"
)
# 持久化候选cid,选预测候选集时用这个过滤
data_set_cid
=
data
[
"cid"
]
.
unique
()
cid_df
=
pd
.
DataFrame
()
cid_df
[
'cid'
]
=
data_set_cid
cid_df
.
to_csv
(
DIRECTORY_PATH
+
"train/data_set_cid.csv"
,
index
=
False
)
print
(
"成功保存data_set_cid"
)
# 将device_id 保存,目的是为了判断预测的device_id是否在这个集合里,如果不在,不需要预测
data_set_device_id
=
data
[
"device_id"
]
.
unique
()
device_id_df
=
pd
.
DataFrame
()
device_id_df
[
'device_id'
]
=
data_set_device_id
device_id_df
.
to_csv
(
DIRECTORY_PATH
+
"train/data_set_device_id.csv"
,
index
=
False
)
print
(
"成功保存data_set_device_id"
)
return
data
,
test_number
,
validation_number
def
ffm_transform
(
data
,
test_number
,
validation_number
):
print
(
"Start ffm transform"
)
start
=
time
.
time
()
ffm_train
=
multiFFMFormatPandas
()
data
=
ffm_train
.
fit_transform
(
data
,
y
=
'y'
,
n
=
50000
,
processes
=
20
)
with
open
(
DIRECTORY_PATH
+
"train/ffm.pkl"
,
"wb"
)
as
f
:
pickle
.
dump
(
ffm_train
,
f
)
print
(
"done transform ffm"
)
end
=
time
.
time
()
print
(
"ffm转化数据耗时(分):"
)
print
((
end
-
start
)
/
60
)
data
.
to_csv
(
DIRECTORY_PATH
+
"total_ffm_data.csv"
,
index
=
False
)
data
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"total_ffm_data.csv"
,
header
=
None
)
print
(
"数据集大小"
)
print
(
data
.
shape
)
test
=
data
.
loc
[:
test_number
]
print
(
"测试集大小"
)
print
(
test
.
shape
[
0
])
test
.
to_csv
(
DIRECTORY_PATH
+
"test_ffm_data.csv"
,
index
=
False
,
header
=
None
)
# 注意:测试集的日期一定要大于验证集,否则数据切割可能会出现错误
validation
=
data
.
loc
[(
test_number
+
1
):(
test_number
+
validation_number
)]
print
(
"验证集大小"
)
print
(
validation
.
shape
[
0
])
validation
.
to_csv
(
DIRECTORY_PATH
+
"validation_ffm_data.csv"
,
index
=
False
,
header
=
None
)
train
=
data
.
loc
[(
test_number
+
validation_number
+
1
):]
print
(
"训练集大小"
)
print
(
train
.
shape
[
0
])
# TODO validation date is not the end of train date
train
.
to_csv
(
DIRECTORY_PATH
+
"train_ffm_data.csv"
,
index
=
False
,
header
=
None
)
restruct/delete_temp_files.py
deleted
100644 → 0
View file @
6560d5e2
import
os
import
time
from
config
import
*
# 定期删除特定文件夹内特征的文件
def
remove_files
(
fileDir
):
for
eachFile
in
os
.
listdir
(
fileDir
):
condition_a
=
os
.
path
.
isfile
(
fileDir
+
"/"
+
eachFile
)
condition_b
=
(
"DiaryTop3000.csv"
in
eachFile
)
or
(
"output.txt"
in
eachFile
)
or
(
"feed"
in
eachFile
)
if
condition_a
and
condition_b
:
ft
=
os
.
stat
(
fileDir
+
"/"
+
eachFile
)
ltime
=
int
(
ft
.
st_mtime
)
# 删除5分钟前的文件
ntime
=
int
(
time
.
time
())
-
5
*
60
if
ltime
<=
ntime
:
os
.
remove
(
fileDir
+
"/"
+
eachFile
)
def
delete_log
():
for
eachFile
in
os
.
listdir
(
"/tmp"
):
if
"xlearn"
in
eachFile
:
os
.
remove
(
"/tmp"
+
"/"
+
eachFile
)
if
__name__
==
"__main__"
:
while
True
:
delete_log
()
remove_files
(
DIRECTORY_PATH
+
"result"
)
print
(
"运行一次"
)
time
.
sleep
(
5
*
60
)
restruct/diary-training.py
deleted
100644 → 0
View file @
6560d5e2
import
datetime
import
pymysql
import
pandas
as
pd
from
sklearn.utils
import
shuffle
import
numpy
as
np
import
xlearn
as
xl
# 从数据库的表里获取数据,并转化成df格式
def
con_sql
(
sql
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
return
df
# 获取点击表里的device_id
sql
=
"select distinct device_id from data_feed_click where cid_type = 'diary'"
click_device_id
=
con_sql
(
sql
)[
0
]
.
values
.
tolist
()
print
(
"成功获取点击表里的device_id"
)
# 获取点击表里的数据
sql
=
"select cid,device_id,time from data_feed_click where cid_type = 'diary'"
click
=
con_sql
(
sql
)
click
=
click
.
rename
(
columns
=
{
0
:
"cid"
,
1
:
"device_id"
,
2
:
"time"
})
print
(
"成功获取点击表里的数据"
)
# 获取曝光表里的数据
sql
=
"select cid,device_id,time from data_feed_exposure where cid_type = 'diary'"
exposure
=
con_sql
(
sql
)
exposure
=
exposure
.
rename
(
columns
=
{
0
:
"cid"
,
1
:
"device_id"
,
2
:
"time"
})
print
(
"成功获取曝光表里的数据"
)
# 求曝光表和点击表的差集合
exposure
.
append
(
click
)
exposure
.
append
(
click
)
subset
=
click
.
columns
.
tolist
()
exposure
=
exposure
.
drop_duplicates
(
subset
=
subset
,
keep
=
False
)
print
(
"成功完成曝光表和点击表的差集合"
)
exposure
=
exposure
.
loc
[
exposure
[
"device_id"
]
.
isin
(
click_device_id
)]
# 打标签
click
[
"y"
]
=
1
exposure
[
"y"
]
=
0
print
(
"成功获取正负样本"
)
# 合并点击表和曝光表
data
=
click
.
append
(
exposure
)
print
(
"done 合并点击表和曝光表"
)
print
(
data
.
head
(
2
))
# 从time特征中抽取hour、weekday
data
[
"hour"
]
=
data
[
"time"
]
.
apply
(
lambda
x
:
datetime
.
datetime
.
fromtimestamp
(
x
)
.
hour
)
data
[
"weekday"
]
=
data
[
"time"
]
.
apply
(
lambda
x
:
datetime
.
datetime
.
fromtimestamp
(
x
)
.
weekday
())
# 数值是0的特征会被ffm格式删除,经过下面的处理后,没有数值是0的特征
data
.
loc
[
data
[
"hour"
]
==
0
]
=
24
data
.
loc
[
data
[
"weekday"
]
==
0
]
=
7
data
[
"hour"
]
=
data
[
"hour"
]
.
astype
(
"category"
)
data
[
"weekday"
]
=
data
[
"weekday"
]
.
astype
(
"category"
)
data
=
data
.
drop
(
"time"
,
axis
=
1
)
print
(
"成功从time特征中抽取hour、weekday"
)
print
(
data
.
head
(
2
))
data
=
shuffle
(
data
)
print
(
"start ffm transform"
)
# ffm 格式转换函数、类
class
FFMFormatPandas
:
def
__init__
(
self
):
self
.
field_index_
=
None
self
.
feature_index_
=
None
self
.
y
=
None
def
fit
(
self
,
df
,
y
=
None
):
self
.
y
=
y
df_ffm
=
df
[
df
.
columns
.
difference
([
self
.
y
])]
if
self
.
field_index_
is
None
:
self
.
field_index_
=
{
col
:
i
for
i
,
col
in
enumerate
(
df_ffm
)}
if
self
.
feature_index_
is
not
None
:
last_idx
=
max
(
list
(
self
.
feature_index_
.
values
()))
if
self
.
feature_index_
is
None
:
self
.
feature_index_
=
dict
()
last_idx
=
0
for
col
in
df
.
columns
:
vals
=
df
[
col
]
.
unique
()
for
val
in
vals
:
if
pd
.
isnull
(
val
):
continue
name
=
'{}_{}'
.
format
(
col
,
val
)
if
name
not
in
self
.
feature_index_
:
self
.
feature_index_
[
name
]
=
last_idx
last_idx
+=
1
self
.
feature_index_
[
col
]
=
last_idx
last_idx
+=
1
return
self
def
fit_transform
(
self
,
df
,
y
=
None
):
self
.
fit
(
df
,
y
)
return
self
.
transform
(
df
)
def
transform_row_
(
self
,
row
,
t
):
ffm
=
[]
if
self
.
y
!=
None
:
ffm
.
append
(
str
(
row
.
loc
[
row
.
index
==
self
.
y
][
0
]))
if
self
.
y
is
None
:
ffm
.
append
(
str
(
0
))
for
col
,
val
in
row
.
loc
[
row
.
index
!=
self
.
y
]
.
to_dict
()
.
items
():
col_type
=
t
[
col
]
name
=
'{}_{}'
.
format
(
col
,
val
)
if
col_type
.
kind
==
'O'
:
ffm
.
append
(
'{}:{}:1'
.
format
(
self
.
field_index_
[
col
],
self
.
feature_index_
[
name
]))
elif
col_type
.
kind
==
'i'
:
ffm
.
append
(
'{}:{}:{}'
.
format
(
self
.
field_index_
[
col
],
self
.
feature_index_
[
col
],
val
))
return
' '
.
join
(
ffm
)
def
transform
(
self
,
df
):
t
=
df
.
dtypes
.
to_dict
()
return
pd
.
Series
({
idx
:
self
.
transform_row_
(
row
,
t
)
for
idx
,
row
in
df
.
iterrows
()})
ffm_train
=
FFMFormatPandas
()
data
=
ffm_train
.
fit_transform
(
data
,
y
=
'y'
)
print
(
"done transform ffm"
)
n
=
np
.
rint
(
data
.
shape
[
0
]
/
8
)
m
=
np
.
rint
(
data
.
shape
[
0
]
*
(
3
/
8
))
# 1/8的数据集用来做测试集
data
.
loc
[:
n
]
.
to_csv
(
"/home/zhangyanzhao/test.csv"
,
index
=
False
,
header
=
None
)
# 1/4的数据集用来做验证集
data
.
loc
[
n
+
1
:
m
]
.
to_csv
(
"/home/zhangyanzhao/validation.csv"
,
index
=
False
,
header
=
None
)
# 剩余的数据集用来做验证集
data
.
loc
[
m
+
1
:]
.
to_csv
(
"/home/zhangyanzhao/train.csv"
,
index
=
False
,
header
=
None
)
# 销毁data,目的是为了节省内存
data
=
data
.
drop
(
data
.
index
.
tolist
())
print
(
"start training"
)
ffm_model
=
xl
.
create_ffm
()
ffm_model
.
setTrain
(
"/home/zhangyanzhao/train.csv"
)
ffm_model
.
setValidate
(
"/home/zhangyanzhao/validation.csv"
)
param
=
{
'task'
:
'binary'
,
'lr'
:
0.2
,
'lambda'
:
0.002
,
'metric'
:
'auc'
}
ffm_model
.
fit
(
param
,
'/home/zhangyanzhao/model.out'
)
ffm_model
.
setTest
(
"/home/zhangyanzhao/test.csv"
)
ffm_model
.
setSigmoid
()
ffm_model
.
predict
(
"/home/zhangyanzhao/model.out"
,
"/home/zhangyanzhao/output.txt"
)
print
(
"end"
)
restruct/diaryCandidateSet.py
deleted
100644 → 0
View file @
6560d5e2
import
pymysql
import
pandas
as
pd
from
utils
import
*
from
config
import
*
import
numpy
as
np
import
time
# 候选集cid只能从训练数据集cid中选择
def
filter_cid
(
df
):
data_set_cid
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"data_set_cid.csv"
)[
"cid"
]
.
values
.
tolist
()
if
not
df
.
empty
:
df
=
df
.
loc
[
df
[
"cid"
]
.
isin
(
data_set_cid
)]
return
df
def
get_allCitiesDiaryTop3000
():
# 获取全国点击量TOP3000日记
sql
=
"select city_id,cid from data_feed_click2 "
\
"where cid_type = 'diary' group by cid order by max(click_count_choice) desc limit 3000"
allCitiesTop3000
=
con_sql
(
sql
)
allCitiesTop3000
=
allCitiesTop3000
.
rename
(
columns
=
{
0
:
"city_id"
,
1
:
"cid"
})
allCitiesTop3000
=
filter_cid
(
allCitiesTop3000
)
allCitiesTop3000
.
to_csv
(
DIRECTORY_PATH
+
"diaryTestSet/allCitiesDiaryTop3000.csv"
,
index
=
False
)
return
allCitiesTop3000
def
get_cityList
():
# 获取全国城市列表
sql
=
"select distinct city_id from data_feed_click2"
cityList
=
con_sql
(
sql
)
cityList
.
to_csv
(
DIRECTORY_PATH
+
"diaryTestSet/cityList.csv"
,
index
=
False
)
cityList
=
cityList
[
0
]
.
values
.
tolist
()
return
cityList
def
get_eachCityDiaryTop3000
():
# 获取每个城市点击量TOP3000日记,如果数量小于3000,用全国点击量TOP3000日记补充
cityList
=
get_cityList
()
allCitiesTop3000
=
get_allCitiesDiaryTop3000
()
for
i
in
cityList
:
sql
=
"select city_id,cid from data_feed_click2 "
\
"where cid_type = 'diary' and city_id = '{0}' group by cid "
\
"order by max(click_count_choice) desc limit 3000"
.
format
(
i
)
data
=
con_sql
(
sql
)
data
=
data
.
rename
(
columns
=
{
0
:
"city_id"
,
1
:
"cid"
})
data
=
filter_cid
(
data
)
if
data
.
shape
[
0
]
<
3000
:
n
=
3000
-
data
.
shape
[
0
]
# 全国点击量TOP3000日记中去除该城市的日记
temp
=
allCitiesTop3000
[
allCitiesTop3000
[
"city_id"
]
!=
i
]
.
loc
[:
n
-
1
]
data
=
data
.
append
(
temp
)
else
:
pass
file_name
=
DIRECTORY_PATH
+
"diaryTestSet/{0}DiaryTop3000.csv"
.
format
(
i
)
data
.
to_csv
(
file_name
,
index
=
False
)
def
pool_method
(
city
,
sql
,
allCitiesTop3000
):
data
=
con_sql
(
sql
)
data
=
data
.
rename
(
columns
=
{
0
:
"city_id"
,
1
:
"cid"
})
data
=
filter_cid
(
data
)
if
data
.
shape
[
0
]
<
3000
:
n
=
3000
-
data
.
shape
[
0
]
# 全国点击量TOP3000日记中去除该城市的日记
temp
=
allCitiesTop3000
[
allCitiesTop3000
[
"city_id"
]
!=
city
]
.
loc
[:
n
-
1
]
data
=
data
.
append
(
temp
)
file_name
=
DIRECTORY_PATH
+
"diaryTestSet/{0}DiaryTop3000.csv"
.
format
(
city
)
data
.
to_csv
(
file_name
,
index
=
False
)
# 多线程方法获取全国城市热门日记
def
multi_get_eachCityDiaryTop3000
(
processes
=
8
):
city_list
=
get_cityList
()
allCitiesTop3000
=
get_allCitiesDiaryTop3000
()
pool
=
Pool
(
processes
)
for
city
in
city_list
:
sql
=
"select city_id,cid from data_feed_click2 "
\
"where cid_type = 'diary' and city_id = '{0}' group by cid "
\
"order by max(click_count_choice) desc limit 3000"
.
format
(
city
)
pool
.
apply_async
(
pool_method
,(
city
,
sql
,
allCitiesTop3000
,))
pool
.
close
()
pool
.
join
()
if
__name__
==
"__main__"
:
start
=
time
.
time
()
multi_get_eachCityDiaryTop3000
()
end
=
time
.
time
()
print
(
"获取各城市热门日记耗时{}分"
.
format
((
end
-
start
)
/
60
))
restruct/diaryQueueUpdate.py
deleted
100644 → 0
View file @
6560d5e2
#!/srv/envs/nvwa/bin/python
# -*- coding: utf-8 -*-
import
pickle
import
xlearn
as
xl
import
pandas
as
pd
import
pymysql
from
datetime
import
datetime
# utils 包必须要导,否则ffm转化时用到的pickle找不到utils,会报错
import
utils
import
warnings
from
multiprocessing
import
Pool
from
userProfile
import
get_active_users
from
sklearn.preprocessing
import
MinMaxScaler
import
time
from
config
import
*
def
get_video_id
(
cache_video_id
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
cursor
=
db
.
cursor
()
sql
=
"select diary_id from feed_diary_boost;"
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
print
(
"videio_id 预览"
)
print
(
df
.
head
(
1
))
db
.
close
()
if
df
.
empty
:
return
cache_video_id
else
:
video_id
=
df
[
0
]
.
values
.
tolist
()
return
video_id
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def
feature_en
(
x_list
,
device_id
):
data
=
pd
.
DataFrame
(
x_list
)
# 下面的列名一定要用cid,不能用diaryid,因为预测模型用到的ffm上是cid
data
=
data
.
rename
(
columns
=
{
0
:
"cid"
})
data
[
"device_id"
]
=
device_id
now
=
datetime
.
now
()
data
[
"hour"
]
=
now
.
hour
data
[
"minute"
]
=
now
.
minute
data
.
loc
[
data
[
"hour"
]
==
0
,
[
"hour"
]]
=
24
data
.
loc
[
data
[
"minute"
]
==
0
,
[
"minute"
]]
=
60
data
[
"hour"
]
=
data
[
"hour"
]
.
astype
(
"category"
)
data
[
"minute"
]
=
data
[
"minute"
]
.
astype
(
"category"
)
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data
[
"y"
]
=
0
# print("done 特征工程")
return
data
# 把ffm.pkl load进来,将上面的数据转化为ffm格式
def
transform_ffm_format
(
df
,
queue_name
,
device_id
):
with
open
(
DIRECTORY_PATH
+
"ffm.pkl"
,
"rb"
)
as
f
:
ffm_format_pandas
=
pickle
.
load
(
f
)
data
=
ffm_format_pandas
.
native_transform
(
df
)
predict_file_name
=
DIRECTORY_PATH
+
"result/{0}_{1}.csv"
.
format
(
device_id
,
queue_name
)
data
.
to_csv
(
predict_file_name
,
index
=
False
,
header
=
None
)
# print("done ffm")
return
predict_file_name
def
predict
(
queue_name
,
queue_arg
,
device_id
):
data
=
feature_en
(
queue_arg
[
0
],
device_id
)
data_file_path
=
transform_ffm_format
(
data
,
queue_name
,
device_id
)
ffm_model
=
xl
.
create_ffm
()
ffm_model
.
setTest
(
data_file_path
)
ffm_model
.
setSigmoid
()
ffm_model
.
predict
(
DIRECTORY_PATH
+
"model.out"
,
DIRECTORY_PATH
+
"result/output{0}_{1}.csv"
.
format
(
device_id
,
queue_name
))
def
save_result
(
queue_name
,
queue_arg
,
device_id
):
score_df
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"result/output{0}_{1}.csv"
.
format
(
device_id
,
queue_name
),
header
=
None
)
mm_scaler
=
MinMaxScaler
()
mm_scaler
.
fit
(
score_df
)
score_df
=
pd
.
DataFrame
(
mm_scaler
.
transform
(
score_df
))
score_df
=
score_df
.
rename
(
columns
=
{
0
:
"score"
})
score_df
[
"cid"
]
=
queue_arg
[
0
]
# 去掉cid前面的"diary|"
score_df
[
"cid"
]
=
score_df
[
"cid"
]
.
apply
(
lambda
x
:
x
[
6
:])
# print("score_df:")
# print(score_df.head(1))
# print(score_df.shape)
if
queue_arg
[
1
]
!=
[]:
df_temp
=
pd
.
DataFrame
(
queue_arg
[
1
])
.
rename
(
columns
=
{
0
:
"cid"
})
df_temp
[
"score"
]
=
0
df_temp
=
df_temp
.
sort_index
(
axis
=
1
,
ascending
=
False
)
df_temp
[
"cid"
]
=
df_temp
[
"cid"
]
.
apply
(
lambda
x
:
x
[
6
:])
predict_score_df
=
score_df
.
append
(
df_temp
)
return
predict_score_df
else
:
return
score_df
def
get_score
(
queue_arg
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
cursor
=
db
.
cursor
()
# 去除diary_id 前面的"diary|"
diary_list
=
tuple
(
list
(
map
(
lambda
x
:
x
[
6
:],
queue_arg
[
2
])))
sql
=
"select score,diary_id from biz_feed_diary_score where diary_id in {};"
.
format
(
diary_list
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
score_df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
return
score_df
def
update_dairy_queue
(
score_df
,
predict_score_df
,
total_video_id
):
diary_id
=
score_df
[
"cid"
]
.
values
.
tolist
()
if
total_video_id
!=
[]:
video_id
=
list
(
set
(
diary_id
)
&
set
(
total_video_id
))
if
len
(
video_id
)
>
0
:
not_video
=
list
(
set
(
diary_id
)
-
set
(
video_id
))
# 为了相加时cid能够匹配,先把cid变成索引
not_video_df
=
score_df
.
loc
[
score_df
[
"cid"
]
.
isin
(
not_video
)]
.
set_index
([
"cid"
])
not_video_predict_df
=
predict_score_df
.
loc
[
predict_score_df
[
"cid"
]
.
isin
(
not_video
)]
.
set_index
([
"cid"
])
not_video_df
[
"score"
]
=
not_video_df
[
"score"
]
+
not_video_predict_df
[
"score"
]
not_video_df
=
not_video_df
.
sort_values
(
by
=
"score"
,
ascending
=
False
)
video_df
=
score_df
.
loc
[
score_df
[
"cid"
]
.
isin
(
video_id
)]
.
set_index
([
"cid"
])
video_predict_df
=
predict_score_df
.
loc
[
predict_score_df
[
"cid"
]
.
isin
(
video_id
)]
.
set_index
([
"cid"
])
video_df
[
"score"
]
=
video_df
[
"score"
]
+
video_predict_df
[
"score"
]
video_df
=
video_df
.
sort_values
(
by
=
"score"
,
ascending
=
False
)
not_video_id
=
not_video_df
.
index
.
tolist
()
video_id
=
video_df
.
index
.
tolist
()
new_queue
=
not_video_id
i
=
1
for
j
in
video_id
:
new_queue
.
insert
(
i
,
j
)
i
+=
5
# print("分数合并成功")
return
new_queue
# 如果取交集后没有视频日记
else
:
score_df
=
score_df
.
set_index
([
"cid"
])
predict_score_df
=
predict_score_df
.
set_index
([
"cid"
])
score_df
[
"score"
]
=
score_df
[
"score"
]
+
predict_score_df
[
"score"
]
score_df
=
score_df
.
sort_values
(
by
=
"score"
,
ascending
=
False
)
# print("分数合并成功1")
return
score_df
.
index
.
tolist
()
# 如果total_video_id是空列表
else
:
score_df
=
score_df
.
set_index
([
"cid"
])
predict_score_df
=
predict_score_df
.
set_index
([
"cid"
])
score_df
[
"score"
]
=
score_df
[
"score"
]
+
predict_score_df
[
"score"
]
score_df
=
score_df
.
sort_values
(
by
=
"score"
,
ascending
=
False
)
# print("分数合并成功1")
return
score_df
.
index
.
tolist
()
def
update_sql_dairy_queue
(
queue_name
,
diary_id
,
device_id
,
city_id
):
db
=
pymysql
.
connect
(
host
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
cursor
=
db
.
cursor
()
id_str
=
str
(
diary_id
[
0
])
for
i
in
range
(
1
,
len
(
diary_id
)):
id_str
=
id_str
+
","
+
str
(
diary_id
[
i
])
sql
=
"update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'"
.
format
\
(
queue_name
,
id_str
,
device_id
,
city_id
)
cursor
.
execute
(
sql
)
db
.
commit
()
db
.
close
()
print
(
"成功写入diary_id"
)
def
queue_compare
(
old_list
,
new_list
):
global
update_queue_numbers
print
(
"更新日记队列总数:{}"
.
format
(
update_queue_numbers
))
# 去掉前面的"diary|"
old_list
=
list
(
map
(
lambda
x
:
int
(
x
[
6
:]),
old_list
))
# print("旧表前十个")
# print(old_list[:10])
# print("新表前十个")
# print(new_list[:10])
temp
=
list
(
range
(
len
(
old_list
)))
x_dict
=
dict
(
zip
(
old_list
,
temp
))
temp
=
list
(
range
(
len
(
new_list
)))
y_dict
=
dict
(
zip
(
new_list
,
temp
))
i
=
0
for
key
in
x_dict
.
keys
():
if
x_dict
[
key
]
!=
y_dict
[
key
]:
i
+=
1
if
i
>
0
:
update_queue_numbers
+=
1
print
(
"更新日记队列总数:{}"
.
format
(
update_queue_numbers
))
print
(
"日记队列更新前日记总个数{},位置发生变化个数{},发生变化率{}
%
"
.
format
(
len
(
old_list
),
i
,
round
(
i
/
len
(
old_list
)
*
100
),
2
))
def
get_queue
(
device_id
,
city_id
,
queue_name
):
db
=
pymysql
.
connect
(
host
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
cursor
=
db
.
cursor
()
sql
=
"select {} from device_diary_queue "
\
"where device_id = '{}' and city_id = '{}';"
.
format
(
queue_name
,
device_id
,
city_id
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
if
df
.
empty
:
# print("该用户对应的日记为空")
return
False
else
:
queue_list
=
df
.
loc
[
0
,
0
]
.
split
(
","
)
queue_list
=
list
(
map
(
lambda
x
:
"diary|"
+
str
(
x
),
queue_list
))
db
.
close
()
# print("成功获取queue")
return
queue_list
def
pipe_line
(
queue_name
,
queue_arg
,
device_id
,
total_video_id
):
predict
(
queue_name
,
queue_arg
,
device_id
)
predict_score_df
=
save_result
(
queue_name
,
queue_arg
,
device_id
)
score_df
=
get_score
(
queue_arg
)
if
score_df
.
empty
:
# print("获取的日记列表是空")
return
False
else
:
score_df
=
score_df
.
rename
(
columns
=
{
0
:
"score"
,
1
:
"cid"
})
diary_queue
=
update_dairy_queue
(
score_df
,
predict_score_df
,
total_video_id
)
return
diary_queue
def
user_update
(
device_id
,
city_id
,
queue_name
,
data_set_cid
,
total_video_id
):
queue_list
=
get_queue
(
device_id
,
city_id
,
queue_name
)
if
queue_list
:
queue_predict
=
list
(
set
(
queue_list
)
&
set
(
data_set_cid
))
queue_not_predict
=
list
(
set
(
queue_list
)
-
set
(
data_set_cid
))
queue_arg
=
[
queue_predict
,
queue_not_predict
,
queue_list
]
if
queue_predict
!=
[]:
diary_queue
=
pipe_line
(
queue_name
,
queue_arg
,
device_id
,
total_video_id
)
if
diary_queue
:
update_sql_dairy_queue
(
queue_name
,
diary_queue
,
device_id
,
city_id
)
queue_compare
(
queue_list
,
diary_queue
)
# print("更新结束")
else
:
print
(
"获取的日记列表是空,所以不更新日记队列"
)
else
:
print
(
"预测集是空,不需要预测"
)
else
:
print
(
"日记队列为空"
)
def
multi_proecess_update
(
device_id
,
city_id
,
data_set_cid
,
total_video_id
):
queue_name_list
=
[
"native_queue"
,
"nearby_queue"
,
"nation_queue"
,
"megacity_queue"
]
pool
=
Pool
(
4
)
for
queue_name
in
queue_name_list
:
pool
.
apply_async
(
user_update
,
(
device_id
,
city_id
,
queue_name
,
data_set_cid
,
total_video_id
,))
pool
.
close
()
pool
.
join
()
if
__name__
==
"__main__"
:
warnings
.
filterwarnings
(
"ignore"
)
total_number
=
0
# 增加缓存日记视频列表
cache_video_id
=
[]
cache_device_city_list
=
[]
update_queue_numbers
=
0
while
True
:
data_set_cid
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"data_set_cid.csv"
)[
"cid"
]
.
values
.
tolist
()
total_video_id
=
get_video_id
(
cache_video_id
)
cache_video_id
=
total_video_id
device_city_list
=
get_active_users
()
print
(
"过滤前用户数:{}"
.
format
(
len
(
device_city_list
)))
# 过滤掉5分钟内预测过的用户
device_city_list
=
list
(
set
(
tuple
(
device_city_list
))
-
set
(
tuple
(
cache_device_city_list
)))
print
(
"过滤后用户数:{}"
.
format
(
len
(
device_city_list
)))
print
(
"缓存视频个数:{}"
.
format
(
len
(
cache_device_city_list
)))
if
datetime
.
now
()
.
minute
%
5
==
0
:
cache_device_city_list
=
[]
if
device_city_list
!=
[]:
cache_device_city_list
.
extend
(
device_city_list
)
total_number
+=
len
(
device_city_list
)
print
(
"累计预测用户总数:{}"
.
format
(
total_number
))
for
device_city
in
device_city_list
:
# start = time.time()
multi_proecess_update
(
device_city
[
0
],
device_city
[
1
],
data_set_cid
,
total_video_id
)
# end = time.time()
# print("更新该用户队列耗时{}秒".format((end - start)))
# # TODO 上线后把预测用户改成多进程预测
restruct/diaryTestSet.py
deleted
100644 → 0
View file @
6560d5e2
import
pymysql
import
pandas
as
pd
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
# 从数据库获取数据,并将数据转化成DataFrame
def
get_data
(
sql
):
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
data
=
cursor
.
fetchall
()
data
=
pd
.
DataFrame
(
list
(
data
))
.
dropna
()
return
data
# 获取全国点击量TOP2000日记
sql
=
"select city_id,cid where cid_type = 'diary' order by click_count_choice desc limit 2000"
allCitiesTop2000
=
get_data
(
sql
)
allCitiesTop2000
=
allCitiesTop2000
.
rename
(
columns
=
{
0
:
"city_id"
,
1
:
"cid"
})
allCitiesTop2000
.
to_csv
(
"
\
home
\
zhangyanzhao
\
diaryTestSet
\a
llCitiesTop2000.csv"
)
print
(
"成功获取全国日记点击量TOP2000"
)
# 获取全国城市列表
sql
=
"select distinct city_id from data_feed_click"
cityList
=
get_data
(
sql
)
cityList
.
to_csv
(
"
\
home
\
zhangyanzhao
\
diaryTestSet
\
cityList.csv"
)
cityList
=
cityList
[
0
]
.
values
.
tolist
()
print
(
"成功获取城市列表"
)
# 获取每个城市点击量TOP2000日记,如果数量小于2000,用全国点击量TOP2000日记补充
for
i
in
cityList
:
sql
=
"select city_id,cid from data_feed_click "
\
"where cid_type = 'diary' and city_id = {0} "
\
"order by click_count_choice desc limit 2000"
.
format
(
i
)
data
=
get_data
(
sql
)
data
=
data
.
rename
(
columns
=
{
0
:
"city_id"
,
1
:
"cid"
})
if
data
.
shape
[
0
]
<
2000
:
n
=
2000
-
data
.
shape
[
0
]
# 全国点击量TOP2000日记中去除该城市的日记
temp
=
allCitiesTop2000
[
allCitiesTop2000
[
"city_id"
]
!=
i
]
.
loc
[:
n
-
1
]
data
=
data
.
append
(
temp
)
else
:
pass
file_name
=
"
\
home
\
zhangyanzhao
\
diaryTestSet
\
{0}DiaryTop2000.csv"
.
format
(
i
)
data
.
to_csv
(
file_name
)
print
(
"end"
)
restruct/diaryTraining.py
deleted
100644 → 0
View file @
6560d5e2
import
xlearn
as
xl
from
config
import
*
def
train
():
print
(
"Start training"
)
ffm_model
=
xl
.
create_ffm
()
ffm_model
.
setTrain
(
DIRECTORY_PATH
+
"train_ffm_data.csv"
)
ffm_model
.
setValidate
(
DIRECTORY_PATH
+
"validation_ffm_data.csv"
)
# log保存路径,如果不加这个参数,日志默认保存在/temp路径下,不符合规范
param
=
{
'task'
:
'binary'
,
'lr'
:
lr
,
'lambda'
:
l2_lambda
,
'metric'
:
'auc'
,
"log"
:
DIRECTORY_PATH
+
"result"
}
ffm_model
.
fit
(
param
,
DIRECTORY_PATH
+
"train/model.out"
)
print
(
"predicting"
)
ffm_model
.
setTest
(
DIRECTORY_PATH
+
"test_ffm_data.csv"
)
ffm_model
.
setSigmoid
()
ffm_model
.
predict
(
DIRECTORY_PATH
+
"train/model.out"
,
DIRECTORY_PATH
+
"test_set_predict_output.txt"
)
restruct/diaryUpdateOnlineOffline.py
deleted
100644 → 0
View file @
6560d5e2
#!/srv/envs/nvwa/bin/python
# -*- coding: utf-8 -*-
import
pickle
import
xlearn
as
xl
import
pandas
as
pd
import
pymysql
from
datetime
import
datetime
# utils 包必须要导,否则ffm转化时用到的pickle找不到utils,会报错
import
utils
import
warnings
from
multiprocessing
import
Pool
from
userProfile
import
get_active_users
from
sklearn.preprocessing
import
MinMaxScaler
import
time
from
config
import
*
import
socket
def
get_video_id
(
cache_video_id
):
if
flag
:
db
=
pymysql
.
connect
(
host
=
ONLINE_EAGLE_HOST
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
else
:
# 本地数据库,没有密码,可能报错
db
=
pymysql
.
connect
(
host
=
LOCAL_EAGLE_HOST
,
port
=
4000
,
user
=
'root'
,
db
=
'eagle'
)
cursor
=
db
.
cursor
()
sql
=
"select diary_id from feed_diary_boost;"
try
:
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
except
Exception
:
print
(
"发生异常"
,
Exception
)
df
=
pd
.
DataFrame
()
finally
:
db
.
close
()
if
df
.
empty
:
return
cache_video_id
else
:
video_id
=
df
[
0
]
.
values
.
tolist
()
return
video_id
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def
feature_en
(
x_list
,
device_id
):
data
=
pd
.
DataFrame
(
x_list
)
# 下面的列名一定要用cid,不能用diaryid,因为预测模型用到的ffm上是cid
data
=
data
.
rename
(
columns
=
{
0
:
"cid"
})
data
[
"device_id"
]
=
device_id
now
=
datetime
.
now
()
data
[
"hour"
]
=
now
.
hour
data
[
"minute"
]
=
now
.
minute
data
.
loc
[
data
[
"hour"
]
==
0
,
[
"hour"
]]
=
24
data
.
loc
[
data
[
"minute"
]
==
0
,
[
"minute"
]]
=
60
data
[
"hour"
]
=
data
[
"hour"
]
.
astype
(
"category"
)
data
[
"minute"
]
=
data
[
"minute"
]
.
astype
(
"category"
)
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data
[
"y"
]
=
0
print
(
"done 特征工程"
)
return
data
# 把ffm.pkl load进来,将上面的数据转化为ffm格式
def
transform_ffm_format
(
df
,
queue_name
,
device_id
):
with
open
(
path
+
"ffm.pkl"
,
"rb"
)
as
f
:
ffm_format_pandas
=
pickle
.
load
(
f
)
data
=
ffm_format_pandas
.
native_transform
(
df
)
predict_file_name
=
path
+
"result/{0}_{1}.csv"
.
format
(
device_id
,
queue_name
)
data
.
to_csv
(
predict_file_name
,
index
=
False
,
header
=
None
)
print
(
"done ffm"
)
return
predict_file_name
def
predict
(
queue_name
,
queue_arg
,
device_id
):
data
=
feature_en
(
queue_arg
[
0
],
device_id
)
data_file_path
=
transform_ffm_format
(
data
,
queue_name
,
device_id
)
ffm_model
=
xl
.
create_ffm
()
ffm_model
.
setTest
(
data_file_path
)
ffm_model
.
setSigmoid
()
ffm_model
.
predict
(
path
+
"model.out"
,
path
+
"result/output{0}_{1}.csv"
.
format
(
device_id
,
queue_name
))
def
save_result
(
queue_name
,
queue_arg
,
device_id
):
score_df
=
pd
.
read_csv
(
path
+
"result/output{0}_{1}.csv"
.
format
(
device_id
,
queue_name
),
header
=
None
)
mm_scaler
=
MinMaxScaler
()
mm_scaler
.
fit
(
score_df
)
score_df
=
pd
.
DataFrame
(
mm_scaler
.
transform
(
score_df
))
score_df
=
score_df
.
rename
(
columns
=
{
0
:
"score"
})
score_df
[
"cid"
]
=
queue_arg
[
0
]
# 去掉cid前面的"diary|"
score_df
[
"cid"
]
=
score_df
[
"cid"
]
.
apply
(
lambda
x
:
x
[
6
:])
# print("score_df:")
# print(score_df.head(1))
# print(score_df.shape)
if
queue_arg
[
1
]
!=
[]:
df_temp
=
pd
.
DataFrame
(
queue_arg
[
1
])
.
rename
(
columns
=
{
0
:
"cid"
})
df_temp
[
"score"
]
=
0
df_temp
=
df_temp
.
sort_index
(
axis
=
1
,
ascending
=
False
)
df_temp
[
"cid"
]
=
df_temp
[
"cid"
]
.
apply
(
lambda
x
:
x
[
6
:])
predict_score_df
=
score_df
.
append
(
df_temp
)
return
predict_score_df
else
:
return
score_df
def
get_score
(
queue_arg
):
if
flag
:
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
else
:
db
=
pymysql
.
connect
(
host
=
LOCAL_HOST
,
port
=
3306
,
user
=
'work'
,
passwd
=
'workwork'
,
db
=
'zhengxing_tes'
)
cursor
=
db
.
cursor
()
# 去除diary_id 前面的"diary|"
diary_list
=
tuple
(
list
(
map
(
lambda
x
:
x
[
6
:],
queue_arg
[
2
])))
sql
=
"select score,diary_id from biz_feed_diary_score where diary_id in {};"
.
format
(
diary_list
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
score_df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
print
(
"get score"
)
return
score_df
def
update_dairy_queue
(
score_df
,
predict_score_df
,
total_video_id
):
diary_id
=
score_df
[
"cid"
]
.
values
.
tolist
()
if
total_video_id
!=
[]:
video_id
=
list
(
set
(
diary_id
)
&
set
(
total_video_id
))
if
len
(
video_id
)
>
0
:
not_video
=
list
(
set
(
diary_id
)
-
set
(
video_id
))
# 为了相加时cid能够匹配,先把cid变成索引
not_video_df
=
score_df
.
loc
[
score_df
[
"cid"
]
.
isin
(
not_video
)]
.
set_index
([
"cid"
])
not_video_predict_df
=
predict_score_df
.
loc
[
predict_score_df
[
"cid"
]
.
isin
(
not_video
)]
.
set_index
([
"cid"
])
not_video_df
[
"score"
]
=
not_video_df
[
"score"
]
+
not_video_predict_df
[
"score"
]
not_video_df
=
not_video_df
.
sort_values
(
by
=
"score"
,
ascending
=
False
)
video_df
=
score_df
.
loc
[
score_df
[
"cid"
]
.
isin
(
video_id
)]
.
set_index
([
"cid"
])
video_predict_df
=
predict_score_df
.
loc
[
predict_score_df
[
"cid"
]
.
isin
(
video_id
)]
.
set_index
([
"cid"
])
video_df
[
"score"
]
=
video_df
[
"score"
]
+
video_predict_df
[
"score"
]
video_df
=
video_df
.
sort_values
(
by
=
"score"
,
ascending
=
False
)
not_video_id
=
not_video_df
.
index
.
tolist
()
video_id
=
video_df
.
index
.
tolist
()
new_queue
=
not_video_id
i
=
1
for
j
in
video_id
:
new_queue
.
insert
(
i
,
j
)
i
+=
5
print
(
"分数合并成功"
)
return
new_queue
# 如果取交集后没有视频日记
else
:
score_df
=
score_df
.
set_index
([
"cid"
])
predict_score_df
=
predict_score_df
.
set_index
([
"cid"
])
score_df
[
"score"
]
=
score_df
[
"score"
]
+
predict_score_df
[
"score"
]
score_df
=
score_df
.
sort_values
(
by
=
"score"
,
ascending
=
False
)
print
(
"分数合并成功1"
)
return
score_df
.
index
.
tolist
()
# 如果total_video_id是空列表
else
:
score_df
=
score_df
.
set_index
([
"cid"
])
predict_score_df
=
predict_score_df
.
set_index
([
"cid"
])
score_df
[
"score"
]
=
score_df
[
"score"
]
+
predict_score_df
[
"score"
]
score_df
=
score_df
.
sort_values
(
by
=
"score"
,
ascending
=
False
)
# print("分数合并成功1")
return
score_df
.
index
.
tolist
()
def
update_sql_dairy_queue
(
queue_name
,
diary_id
,
device_id
,
city_id
):
if
flag
:
db
=
pymysql
.
connect
(
host
=
QUEUE_ONLINE_HOST
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
else
:
db
=
pymysql
.
connect
(
host
=
LOCAL_HOST
,
port
=
3306
,
user
=
'work'
,
passwd
=
'workwork'
,
db
=
'doris_test'
)
cursor
=
db
.
cursor
()
id_str
=
str
(
diary_id
[
0
])
for
i
in
range
(
1
,
len
(
diary_id
)):
id_str
=
id_str
+
","
+
str
(
diary_id
[
i
])
sql
=
"update device_diary_queue set {}='{}' where device_id = '{}' and city_id = '{}'"
.
format
\
(
queue_name
,
id_str
,
device_id
,
city_id
)
cursor
.
execute
(
sql
)
db
.
commit
()
db
.
close
()
print
(
"成功写入diary_id"
)
def
queue_compare
(
old_list
,
new_list
):
# 去掉前面的"diary|"
old_list
=
list
(
map
(
lambda
x
:
int
(
x
[
6
:]),
old_list
))
# print("旧表前十个")
# print(old_list[:10])
# print("新表前十个")
# print(new_list[:10])
temp
=
list
(
range
(
len
(
old_list
)))
x_dict
=
dict
(
zip
(
old_list
,
temp
))
temp
=
list
(
range
(
len
(
new_list
)))
y_dict
=
dict
(
zip
(
new_list
,
temp
))
i
=
0
for
key
in
x_dict
.
keys
():
if
x_dict
[
key
]
!=
y_dict
[
key
]:
i
+=
1
if
i
>
0
:
print
(
"日记队列更新前日记总个数{},位置发生变化个数{},发生变化率{}
%
"
.
format
(
len
(
old_list
),
i
,
round
(
i
/
len
(
old_list
)
*
100
),
2
))
def
get_queue
(
device_id
,
city_id
,
queue_name
):
if
flag
:
db
=
pymysql
.
connect
(
host
=
QUEUE_ONLINE_HOST
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
else
:
db
=
pymysql
.
connect
(
host
=
LOCAL_HOST
,
port
=
3306
,
user
=
'work'
,
passwd
=
'workwork'
,
db
=
'doris_test'
)
cursor
=
db
.
cursor
()
sql
=
"select {} from device_diary_queue "
\
"where device_id = '{}' and city_id = '{}';"
.
format
(
queue_name
,
device_id
,
city_id
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
if
df
.
empty
:
print
(
"该用户对应的日记为空"
)
return
False
else
:
queue_list
=
df
.
loc
[
0
,
0
]
.
split
(
","
)
queue_list
=
list
(
map
(
lambda
x
:
"diary|"
+
str
(
x
),
queue_list
))
db
.
close
()
print
(
"成功获取queue"
)
return
queue_list
def
pipe_line
(
queue_name
,
queue_arg
,
device_id
,
total_video_id
):
predict
(
queue_name
,
queue_arg
,
device_id
)
predict_score_df
=
save_result
(
queue_name
,
queue_arg
,
device_id
)
score_df
=
get_score
(
queue_arg
)
if
score_df
.
empty
:
print
(
"获取的日记列表是空"
)
return
False
else
:
score_df
=
score_df
.
rename
(
columns
=
{
0
:
"score"
,
1
:
"cid"
})
diary_queue
=
update_dairy_queue
(
score_df
,
predict_score_df
,
total_video_id
)
return
diary_queue
def
user_update
(
device_id
,
city_id
,
queue_name
,
data_set_cid
,
total_video_id
):
queue_list
=
get_queue
(
device_id
,
city_id
,
queue_name
)
if
queue_list
:
queue_predict
=
list
(
set
(
queue_list
)
&
set
(
data_set_cid
))
queue_not_predict
=
list
(
set
(
queue_list
)
-
set
(
data_set_cid
))
queue_arg
=
[
queue_predict
,
queue_not_predict
,
queue_list
]
if
queue_predict
!=
[]:
diary_queue
=
pipe_line
(
queue_name
,
queue_arg
,
device_id
,
total_video_id
)
if
diary_queue
:
update_sql_dairy_queue
(
queue_name
,
diary_queue
,
device_id
,
city_id
)
queue_compare
(
queue_list
,
diary_queue
)
# print("更新结束")
else
:
print
(
"获取的日记列表是空,所以不更新日记队列"
)
else
:
print
(
"预测集是空,不需要预测"
)
else
:
print
(
"日记队列为空"
)
def
multi_proecess_update
(
device_id
,
city_id
,
data_set_cid
,
total_video_id
):
queue_name_list
=
[
"native_queue"
,
"nearby_queue"
,
"nation_queue"
,
"megacity_queue"
]
pool
=
Pool
(
4
)
for
queue_name
in
queue_name_list
:
pool
.
apply_async
(
user_update
,
(
device_id
,
city_id
,
queue_name
,
data_set_cid
,
total_video_id
,))
pool
.
close
()
pool
.
join
()
if
__name__
==
"__main__"
:
warnings
.
filterwarnings
(
"ignore"
)
flag
=
True
path
=
DIRECTORY_PATH
# 下面这个ip是本地电脑ip
if
socket
.
gethostbyname
(
socket
.
gethostname
())
==
'172.30.8.160'
:
flag
=
False
path
=
LOCAL_DIRCTORY
# 增加缓存日记视频列表
cache_video_id
=
[]
cache_device_city_list
=
[]
differ
=
0
while
True
:
data_set_cid
=
pd
.
read_csv
(
path
+
"data_set_cid.csv"
)[
"cid"
]
.
values
.
tolist
()
total_video_id
=
get_video_id
(
cache_video_id
)
cache_video_id
=
total_video_id
start
=
time
.
time
()
device_city_list
=
get_active_users
(
flag
,
path
,
differ
)
# 过滤掉5分钟内预测过的用户
device_city_list
=
list
(
set
(
tuple
(
device_city_list
))
-
set
(
tuple
(
cache_device_city_list
)))
if
datetime
.
now
()
.
minute
%
5
==
0
:
cache_device_city_list
=
[]
if
device_city_list
!=
[]:
cache_device_city_list
.
extend
(
device_city_list
)
for
device_city
in
device_city_list
:
multi_proecess_update
(
device_city
[
0
],
device_city
[
1
],
data_set_cid
,
total_video_id
)
differ
=
time
.
time
()
-
start
# # TODO 上线后把预测用户改成多进程预测
restruct/keepProcess.py
deleted
100644 → 0
View file @
6560d5e2
import
os
import
time
def
check
():
out
=
os
.
popen
(
"ps aux | grep diaryQueueUpdate.py"
)
.
read
()
flag
=
1
for
line
in
out
.
splitlines
():
if
'python diaryQueueUpdate.py'
in
line
:
flag
=
2
return
flag
if
__name__
==
"__main__"
:
#TODO 正式上线后,把下面的循环和time.sleep打开
# while True:
if
check
()
==
1
:
os
.
popen
(
'python diaryQueueUpdate.py'
)
print
(
"成功重启diaryQueueUpdate"
)
# time.sleep(300)
\ No newline at end of file
restruct/predictDiary.py
deleted
100644 → 0
View file @
6560d5e2
from
config
import
*
import
pandas
as
pd
import
pickle
import
xlearn
as
xl
from
userProfile
import
*
import
time
from
utils
import
*
import
os
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def
feature_en
(
user_profile
):
file_name
=
DIRECTORY_PATH
+
"diaryTestSet/{0}DiaryTop3000.csv"
.
format
(
user_profile
[
'city_id'
])
data
=
pd
.
read_csv
(
file_name
)
data
[
"device_id"
]
=
user_profile
[
'device_id'
]
now
=
datetime
.
now
()
data
[
"hour"
]
=
now
.
hour
data
[
"minute"
]
=
now
.
minute
data
.
loc
[
data
[
"hour"
]
==
0
,
[
"hour"
]]
=
24
data
.
loc
[
data
[
"minute"
]
==
0
,
[
"minute"
]]
=
60
data
[
"hour"
]
=
data
[
"hour"
]
.
astype
(
"category"
)
data
[
"minute"
]
=
data
[
"minute"
]
.
astype
(
"category"
)
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data
[
"y"
]
=
0
data
=
data
.
drop
(
"city_id"
,
axis
=
1
)
return
data
# 把ffm.pkl load进来,将上面的表转化为ffm格式
def
transform_ffm_format
(
df
,
device_id
):
with
open
(
DIRECTORY_PATH
+
"ffm.pkl"
,
"rb"
)
as
f
:
ffm_format_pandas
=
pickle
.
load
(
f
)
data
=
ffm_format_pandas
.
transform
(
df
)
now
=
datetime
.
now
()
.
strftime
(
"
%
Y-
%
m-
%
d-
%
H-
%
M"
)
predict_file_name
=
DIRECTORY_PATH
+
"result/{0}_{1}DiaryTop3000.csv"
.
format
(
device_id
,
now
)
data
.
to_csv
(
predict_file_name
,
index
=
False
,
header
=
None
)
print
(
"成功将ffm预测文件写到服务器"
)
return
predict_file_name
# 将模型加载,预测,把预测日记的概率值按照降序排序,存到一个表里
def
predict
(
user_profile
):
instance
=
feature_en
(
user_profile
)
instance_file_path
=
transform_ffm_format
(
instance
,
user_profile
[
"device_id"
])
ffm_model
=
xl
.
create_ffm
()
ffm_model
.
setTest
(
instance_file_path
)
ffm_model
.
setSigmoid
()
ffm_model
.
predict
(
DIRECTORY_PATH
+
"model.out"
,
DIRECTORY_PATH
+
"result/{0}_output.txt"
.
format
(
user_profile
[
'device_id'
]))
print
(
"该用户预测结束"
)
predict_save_to_local
(
user_profile
,
instance
)
# 将预测结果与device_id 进行拼接,并按照概率降序排序
def
wrapper_result
(
user_profile
,
instance
):
proba
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"result/{0}_output.txt"
.
format
(
user_profile
[
'device_id'
]),
header
=
None
)
proba
=
proba
.
rename
(
columns
=
{
0
:
"prob"
})
proba
[
"cid"
]
=
instance
[
'cid'
]
proba
=
proba
.
sort_values
(
by
=
"prob"
,
ascending
=
False
)
proba
=
proba
.
head
(
50
)
return
proba
# 预测候选集保存到本地
def
predict_save_to_local
(
user_profile
,
instance
):
proba
=
wrapper_result
(
user_profile
,
instance
)
proba
.
loc
[:,
"url"
]
=
proba
[
"cid"
]
.
apply
(
lambda
x
:
"http://m.igengmei.com/diary_book/"
+
str
(
x
[
6
:])
+
'/'
)
proba
.
to_csv
(
DIRECTORY_PATH
+
"result/feed_{}"
.
format
(
user_profile
[
'device_id'
]),
index
=
False
)
print
(
"成功将预测候选集保存到本地"
)
def
router
(
device_id
):
user_profile
,
not_exist
=
fetch_user_profile
(
device_id
)
if
not_exist
:
print
(
'Sorry, we don
\'
t have you.'
)
else
:
predict
(
user_profile
)
# 多进程预测
def
multi_predict
(
predict_list
,
processes
=
12
):
pool
=
Pool
(
processes
)
for
device_id
in
predict_list
:
start
=
time
.
time
()
pool
.
apply_async
(
router
,
(
device_id
,))
end
=
time
.
time
()
print
(
"该用户{}预测耗时{}秒"
.
format
(
device_id
,
(
end
-
start
)))
pool
.
close
()
pool
.
join
()
if
__name__
==
"__main__"
:
# TODO 如果耗时小于一分钟,下一次取到的device_id和上一次相同。还有一种情况,一个用户持续活跃,会被重复预测
while
True
:
empty
,
device_id_list
=
get_active_users
()
if
empty
:
for
eachFile
in
os
.
listdir
(
"/tmp"
):
if
"xlearn"
in
eachFile
:
os
.
remove
(
"/tmp"
+
"/"
+
eachFile
)
time
.
sleep
(
58
)
else
:
old_device_id_list
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"data_set_device_id.csv"
)[
"device_id"
]
.
values
.
tolist
()
# 求活跃用户和老用户的交集,也就是只预测老用户
predict_list
=
list
(
set
(
device_id_list
)
&
set
(
old_device_id_list
))
multi_predict
(
predict_list
)
#TODO 上线前把预测流程中的计时器、打印代码删掉或者注释,因为预测对性能要求高,能少一条代码语句就少一条
restruct/prepareData.py
deleted
100644 → 0
View file @
6560d5e2
from
utils
import
con_sql
import
datetime
import
time
def
fetch_data
(
start_date
,
end_date
):
# 获取点击表里的device_id
sql
=
"select distinct device_id from data_feed_click2"
click_device_id
=
con_sql
(
sql
)[
0
]
.
values
.
tolist
()
print
(
"成功获取点击表里的device_id"
)
# 获取点击表里的数据
sql
=
"select cid,device_id,time,stat_date from data_feed_click2 "
\
"where stat_date >= '{0}' and stat_date <= '{1}'"
.
format
(
start_date
,
end_date
)
click
=
con_sql
(
sql
)
click
=
click
.
rename
(
columns
=
{
0
:
"cid"
,
1
:
"device_id"
,
2
:
"time_date"
,
3
:
"stat_date"
})
print
(
"成功获取点击表里的数据"
)
# 从time特征中抽取hour
click
[
"hour"
]
=
click
[
"time_date"
]
.
apply
(
lambda
x
:
datetime
.
datetime
.
fromtimestamp
(
x
)
.
hour
)
click
[
"minute"
]
=
click
[
"time_date"
]
.
apply
(
lambda
x
:
datetime
.
datetime
.
fromtimestamp
(
x
)
.
minute
)
click
=
click
.
drop
(
"time_date"
,
axis
=
1
)
# 获取曝光表里的数据
sql
=
"select cid,device_id,time,stat_date from data_feed_exposure2 "
\
"where stat_date >= '{0}' and stat_date <= '{1}'"
.
format
(
start_date
,
end_date
)
start
=
time
.
time
()
exposure
=
con_sql
(
sql
)
end
=
time
.
time
()
print
(
"获取曝光表耗时{}分"
.
format
((
end
-
start
)
/
60
))
exposure
=
exposure
.
rename
(
columns
=
{
0
:
"cid"
,
1
:
"device_id"
,
2
:
"time_date"
,
3
:
"stat_date"
})
print
(
"成功获取曝光表里的数据"
)
# 从time特征中抽取hour
exposure
[
"hour"
]
=
exposure
[
"time_date"
]
.
apply
(
lambda
x
:
datetime
.
datetime
.
fromtimestamp
(
x
)
.
hour
)
exposure
[
"minute"
]
=
exposure
[
"time_date"
]
.
apply
(
lambda
x
:
datetime
.
datetime
.
fromtimestamp
(
x
)
.
minute
)
exposure
=
exposure
.
drop
(
"time_date"
,
axis
=
1
)
return
exposure
,
click
,
click_device_id
restruct/train.py
deleted
100644 → 0
View file @
6560d5e2
from
dataProcess
import
*
from
diaryTraining
import
*
import
time
from
utils
import
*
# 把数据获取、特征转换、模型训练的模型串联在一起
if
__name__
==
"__main__"
:
# while True:
# now = datetime.now()
# if (now.hour == 23) and (now.minute == 30):
start_train
=
time
.
time
()
data_start_date
,
data_end_date
,
validation_date
,
test_date
=
get_date
()
data
,
test_number
,
validation_number
=
feature_en
(
data_start_date
,
data_end_date
,
validation_date
,
test_date
)
ffm_transform
(
data
,
test_number
,
validation_number
)
train
()
end_train
=
time
.
time
()
print
(
"训练模型耗时{}分"
.
format
((
end_train
-
start_train
)
/
60
))
move_file
()
#TODO 如果用自己写的keepProcess文件守护进程,下面在这个函数里删掉重新启动进程那行代码,因为可能会多启动一次进程
# restart_process()
restruct/userProfile.py
deleted
100644 → 0
View file @
6560d5e2
from
utils
import
con_sql
from
datetime
import
datetime
from
config
import
*
import
pandas
as
pd
import
os
import
time
import
pymysql
import
time
# 获取当下一分钟内活跃用户
def
get_active_users
(
flag
,
path
,
differ
):
if
differ
==
0
:
end
=
time
.
time
()
start
=
time
.
time
()
-
60
elif
0
<
differ
<
10
:
time
.
sleep
(
30
)
differ
+=
30
end
=
time
.
time
()
start
=
end
-
differ
else
:
end
=
time
.
time
()
start
=
end
-
differ
end_datetime
=
str
(
datetime
.
fromtimestamp
(
end
))
start_datetime
=
str
(
datetime
.
fromtimestamp
(
start
))
sql
=
"select device_id,city_id from user_active_time "
\
"where active_time <= '{}' and active_time >= '{}'"
.
format
(
end_datetime
,
start_datetime
)
if
flag
:
df
=
con_sql
(
sql
)
else
:
db
=
pymysql
.
connect
(
host
=
'192.168.15.12'
,
port
=
4000
,
user
=
'root'
,
db
=
'jerry_test'
)
sql
=
"select device_id,city_id from user_active_time"
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
if
df
.
empty
:
print
(
"当下没有活跃用户数"
)
return
[]
# 统计活跃用户中尾号是6的用户数
else
:
temp_list
=
df
[
0
]
.
values
.
tolist
()
now
=
datetime
.
now
()
tail6_file_path
=
path
+
"{}tail6Unique.csv"
.
format
(
str
(
now
)[:
10
])
if
os
.
path
.
exists
(
tail6_file_path
):
# 尾号是6的活跃用户数
tail_6_list
=
eval
(
pd
.
read_csv
(
tail6_file_path
)
.
loc
[
0
,
"list"
])
else
:
tail_6_list
=
[]
tail_6_list
.
extend
(
list
(
filter
(
lambda
x
:
(
str
(
x
)[
-
1
]
==
"6"
),
temp_list
)))
if
tail_6_list
!=
[]:
df_tail_6
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
tail_6_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
tail_6_list
))]})
df_tail_6
.
to_csv
(
tail6_file_path
,
index
=
None
)
print
(
"截止现在尾号是6的独立活跃数:{}"
.
format
(
len
(
set
(
tail_6_list
))))
old_device_id_list
=
pd
.
read_csv
(
path
+
"data_set_device_id.csv"
)[
"device_id"
]
.
values
.
tolist
()
# 求活跃用户和老用户的交集,也就是只预测老用户
df
=
df
.
loc
[
df
[
0
]
.
isin
(
old_device_id_list
)]
if
df
.
empty
:
print
(
"该列表是新用户,不需要预测"
)
return
[]
else
:
# TODO 正式上线后注释下面的只预测尾号是6的代码
# 只预测尾号是6的ID,这块是测试要求的
device_temp_list
=
df
[
0
]
.
values
.
tolist
()
predict_list
=
list
(
filter
(
lambda
x
:
(
str
(
x
)[
-
1
]
==
"6"
)
or
(
str
(
x
)
==
"358035085192742"
)
or
str
(
x
)
==
"AB20292B-5D15-4C44-9429-1C2FF5ED26F6"
,
device_temp_list
))
if
predict_list
==
[]:
print
(
'没有尾号是6和目标用户'
)
return
[]
else
:
df
=
df
.
loc
[
df
[
0
]
.
isin
(
predict_list
)]
device_list
=
df
[
0
]
.
values
.
tolist
()
city_list
=
df
[
1
]
.
values
.
tolist
()
device_city_list
=
list
(
zip
(
device_list
,
city_list
))
print
(
"当下这一分钟预测用户数量:{}"
.
format
(
len
(
device_city_list
)))
#统计尾号6的预测用户
predict_file_path
=
path
+
"{}predictTail6Unique.csv"
.
format
(
str
(
now
)[:
10
])
if
os
.
path
.
exists
(
predict_file_path
):
# 预测过尾号是6的用户数
all_predict_list
=
eval
(
pd
.
read_csv
(
predict_file_path
)
.
loc
[
0
,
"list"
])
else
:
all_predict_list
=
[]
all_predict_list
.
extend
(
device_list
)
if
all_predict_list
!=
[]:
df_predict
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
all_predict_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
all_predict_list
))]})
df_predict
.
to_csv
(
predict_file_path
,
index
=
None
)
return
device_city_list
def
fetch_user_profile
(
device_id
):
sql
=
"select device_id,city_id from data_feed_click where device_id = '{0}' limit 1"
.
format
(
device_id
)
user_profile
=
con_sql
(
sql
)
if
user_profile
.
empty
:
print
(
"没有获取到该用户对应的city_id"
)
return
None
,
True
else
:
user_profile
=
user_profile
.
rename
(
columns
=
{
0
:
"device_id"
,
1
:
"city_id"
})
user_profile_dict
=
{}
for
i
in
user_profile
.
columns
:
user_profile_dict
[
i
]
=
user_profile
.
loc
[
0
,
i
]
return
user_profile_dict
,
False
restruct/utils.py
deleted
100644 → 0
View file @
6560d5e2
# encoding = "utf-8"
from
datetime
import
datetime
from
datetime
import
timedelta
import
pymysql
import
numpy
as
np
import
pandas
as
pd
from
sklearn
import
metrics
from
sklearn.metrics
import
auc
from
multiprocessing
import
Pool
import
os
import
signal
from
config
import
*
def
get_date
():
now
=
datetime
.
now
()
year
=
now
.
year
month
=
now
.
month
day
=
now
.
day
date
=
datetime
(
year
,
month
,
day
)
data_start_date
=
"2018-07-15"
data_end_date
=
"2018-08-30"
validation_date
=
"2018-08-29"
# data_start_date = (date - timedelta(days=3)).strftime("%Y-%m-%d")
# data_end_date = (date - timedelta(days=1)).strftime("%Y-%m-%d")
# validation_date = (date - timedelta(days=2)).strftime("%Y-%m-%d")
# 验证集和测试集的日期必须相差一天,否则切割数据集时会报错
test_date
=
data_end_date
print
(
"data_start_date,data_end_date,validation_date,test_date:"
)
print
(
data_start_date
,
data_end_date
,
validation_date
,
test_date
)
return
data_start_date
,
data_end_date
,
validation_date
,
test_date
def
get_roc_curve
(
y
,
pred
,
pos_label
):
"""
计算二分类问题的roc和auc
"""
fpr
,
tpr
,
thresholds
=
metrics
.
roc_curve
(
y
,
pred
,
pos_label
)
AUC
=
metrics
.
auc
(
fpr
,
tpr
)
print
(
AUC
)
# 从Tidb数据库的表里获取数据,并转化成df格式
def
con_sql
(
sql
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
return
df
def
move_file
():
import
os
for
eachFile
in
os
.
listdir
(
DIRECTORY_PATH
+
"train"
):
os
.
rename
(
DIRECTORY_PATH
+
"train"
+
"/"
+
eachFile
,
DIRECTORY_PATH
+
eachFile
)
print
(
"成功将文件剪切到对应路径"
)
def
restart_process
():
out
=
os
.
popen
(
"ps aux | grep diaryUpdateOnlineOffline.py"
)
.
read
()
for
line
in
out
.
splitlines
():
if
'python diaryUpdateOnlineOffline.py'
in
line
:
pid
=
int
(
line
.
split
()[
1
])
# 有些进程的生命周期非常短或者随时可能结束,一定要捕捉这个异常
try
:
os
.
kill
(
pid
,
signal
.
SIGKILL
)
print
(
"已杀死python diaryUpdateOnlineOffline.py 进程"
)
except
OSError
:
print
(
'没有如此进程!!!'
)
os
.
popen
(
'python diaryUpdateOnlineOffline.py'
)
print
(
"成功重启diaryUpdateOnlineOffline.py"
)
else
:
os
.
popen
(
'python diaryUpdateOnlineOffline.py'
)
print
(
"成功重启diaryUpdateOnlineOffline.py"
)
# 多线程ffm转化类:
class
multiFFMFormatPandas
:
def
__init__
(
self
):
self
.
field_index_
=
None
self
.
feature_index_
=
None
self
.
y
=
None
def
fit
(
self
,
df
,
y
=
None
):
self
.
y
=
y
df_ffm
=
df
[
df
.
columns
.
difference
([
self
.
y
])]
if
self
.
field_index_
is
None
:
self
.
field_index_
=
{
col
:
i
for
i
,
col
in
enumerate
(
df_ffm
)}
if
self
.
feature_index_
is
not
None
:
last_idx
=
max
(
list
(
self
.
feature_index_
.
values
()))
if
self
.
feature_index_
is
None
:
self
.
feature_index_
=
dict
()
last_idx
=
0
for
col
in
df
.
columns
:
vals
=
df
[
col
]
.
unique
()
for
val
in
vals
:
if
pd
.
isnull
(
val
):
continue
name
=
'{}_{}'
.
format
(
col
,
val
)
if
name
not
in
self
.
feature_index_
:
self
.
feature_index_
[
name
]
=
last_idx
last_idx
+=
1
self
.
feature_index_
[
col
]
=
last_idx
last_idx
+=
1
return
self
def
fit_transform
(
self
,
df
,
y
=
None
,
n
=
50000
,
processes
=
4
):
# n是每个线程运行最大的数据条数,processes是线程数
self
.
fit
(
df
,
y
)
n
=
n
processes
=
processes
return
self
.
transform
(
df
,
n
,
processes
)
def
transform_row_
(
self
,
row
,
t
):
ffm
=
[]
if
self
.
y
is
not
None
:
ffm
.
append
(
str
(
row
.
loc
[
row
.
index
==
self
.
y
][
0
]))
if
self
.
y
is
None
:
ffm
.
append
(
str
(
0
))
for
col
,
val
in
row
.
loc
[
row
.
index
!=
self
.
y
]
.
to_dict
()
.
items
():
col_type
=
t
[
col
]
name
=
'{}_{}'
.
format
(
col
,
val
)
if
col_type
.
kind
==
'O'
:
ffm
.
append
(
'{}:{}:1'
.
format
(
self
.
field_index_
[
col
],
self
.
feature_index_
[
name
]))
elif
col_type
.
kind
==
'i'
:
ffm
.
append
(
'{}:{}:{}'
.
format
(
self
.
field_index_
[
col
],
self
.
feature_index_
[
col
],
val
))
return
' '
.
join
(
ffm
)
def
transform
(
self
,
df
,
n
=
1500
,
processes
=
2
):
# n是每个线程运行最大的数据条数,processes是线程数
t
=
df
.
dtypes
.
to_dict
()
data_list
=
self
.
data_split_line
(
df
,
n
)
# 设置进程的数量
pool
=
Pool
(
processes
)
print
(
"总进度: "
+
str
(
len
(
data_list
)))
for
i
in
range
(
len
(
data_list
)):
data_list
[
i
]
=
pool
.
apply_async
(
self
.
pool_function
,
(
data_list
[
i
],
t
,))
result_map
=
{}
for
i
in
data_list
:
result_map
.
update
(
i
.
get
())
pool
.
close
()
pool
.
join
()
return
pd
.
Series
(
result_map
)
# 多进程计算方法
def
pool_function
(
self
,
df
,
t
):
return
{
idx
:
self
.
transform_row_
(
row
,
t
)
for
idx
,
row
in
df
.
iterrows
()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def
data_split_line
(
self
,
data
,
step
):
data_list
=
[]
x
=
0
while
True
:
if
x
+
step
<
data
.
__len__
():
data_list
.
append
(
data
.
iloc
[
x
:
x
+
step
])
x
=
x
+
step
+
1
else
:
data_list
.
append
(
data
.
iloc
[
x
:
data
.
__len__
()])
break
'''
# 返回生成器方法,但是本地测试效率不高
x = 0
while True:
if x + step < data.__len__():
yield data.iloc[x:x + step]
x = x + step + 1
else:
yield data.iloc[x:data.__len__()]
break
'''
return
data_list
# 原生转化方法,不需要多进程
def
native_transform
(
self
,
df
):
t
=
df
.
dtypes
.
to_dict
()
return
pd
.
Series
({
idx
:
self
.
transform_row_
(
row
,
t
)
for
idx
,
row
in
df
.
iterrows
()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def
is_feature_index_exist
(
self
,
name
):
if
name
in
self
.
feature_index_
:
return
True
else
:
return
False
# ffm 格式转换函数、类
# class FFMFormatPandas:
# def __init__(self):
# self.field_index_ = None
# self.feature_index_ = None
# self.y = None
#
# def fit(self, df, y=None):
# self.y = y
# df_ffm = df[df.columns.difference([self.y])]
# if self.field_index_ is None:
# self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
#
# if self.feature_index_ is not None:
# last_idx = max(list(self.feature_index_.values()))
#
# if self.feature_index_ is None:
# self.feature_index_ = dict()
# last_idx = 0
#
# for col in df.columns:
# vals = df[col].unique()
# for val in vals:
# if pd.isnull(val):
# continue
# name = '{}_{}'.format(col, val)
# if name not in self.feature_index_:
# self.feature_index_[name] = last_idx
# last_idx += 1
# self.feature_index_[col] = last_idx
# last_idx += 1
# return self
#
# def fit_transform(self, df, y=None):
# self.fit(df, y)
# return self.transform(df)
#
# def transform_row_(self, row, t):
# ffm = []
# if self.y is not None:
# ffm.append(str(row.loc[row.index == self.y][0]))
# if self.y is None:
# ffm.append(str(0))
#
# for col, val in row.loc[row.index != self.y].to_dict().items():
# col_type = t[col]
# name = '{}_{}'.format(col, val)
# if col_type.kind == 'O':
# ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
# elif col_type.kind == 'i':
# ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
# return ' '.join(ffm)
#
# def transform(self, df):
# t = df.dtypes.to_dict()
# return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
#
# # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
# def is_feature_index_exist(self, name):
# if name in self.feature_index_:
# return True
# else:
# return False
train.py
View file @
af381d3d
...
...
@@ -18,8 +18,7 @@ if __name__ == "__main__":
end_train
=
time
.
time
()
print
(
"训练模型耗时{}分"
.
format
((
end_train
-
start_train
)
/
60
))
move_file
()
#TODO 如果用自己写的keepProcess文件守护进程,下面在这个函数里删掉重新启动进程那行代码,因为可能会多启动一次进程
restart_process
()
...
...
userProfile.py
View file @
af381d3d
...
...
@@ -8,6 +8,37 @@ import pymysql
import
time
# 统计尾号6的活跃用户数
def
unique_user_count
(
file_path
,
temp_list
,
now
):
if
os
.
path
.
exists
(
file_path
):
# 尾号是6的活跃用户数
tail_6_list
=
eval
(
pd
.
read_csv
(
file_path
)
.
loc
[
0
,
"list"
])
else
:
tail_6_list
=
[]
tail_6_list
.
extend
(
list
(
filter
(
lambda
x
:
(
str
(
x
)[
-
1
]
==
"6"
),
temp_list
)))
if
tail_6_list
!=
[]:
df_tail_6
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
tail_6_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
tail_6_list
))]})
df_tail_6
.
to_csv
(
file_path
,
index
=
None
)
print
(
"截止现在尾号是6的独立活跃数:{}"
.
format
(
len
(
set
(
tail_6_list
))))
# 统计预测过的独立用户数
def
predict_user_count
(
predict_file_path
,
device_list
,
now
):
if
os
.
path
.
exists
(
predict_file_path
):
# 预测过尾号是6的用户数
all_predict_list
=
eval
(
pd
.
read_csv
(
predict_file_path
)
.
loc
[
0
,
"list"
])
else
:
all_predict_list
=
[]
all_predict_list
.
extend
(
device_list
)
if
all_predict_list
!=
[]:
df_predict
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
all_predict_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
all_predict_list
))]})
df_predict
.
to_csv
(
predict_file_path
,
index
=
None
)
print
(
"截止现在预测过尾号是6的独立活跃数:{}"
.
format
(
len
(
set
(
all_predict_list
))))
# 获取当下一分钟内活跃用户
def
get_active_users
(
flag
,
path
,
differ
):
if
differ
==
0
:
...
...
@@ -23,18 +54,18 @@ def get_active_users(flag,path,differ):
start
=
end
-
differ
end_datetime
=
str
(
datetime
.
fromtimestamp
(
end
))
start_datetime
=
str
(
datetime
.
fromtimestamp
(
start
))
sql
=
"select device_id,city_id from user_active_time "
\
"where active_time <= '{}' and active_time >= '{}'"
.
format
(
end_datetime
,
start_datetime
)
if
flag
:
df
=
con_sql
(
sql
)
sql
=
"select device_id,city_id from user_active_time "
\
"where active_time <= '{}' and active_time >= '{}'"
.
format
(
end_datetime
,
start_datetime
)
db
=
pymysql
.
connect
(
host
=
ACTIVE_USER_DB_ONLINE
[
"host"
],
port
=
ACTIVE_USER_DB_ONLINE
[
"port"
],
user
=
ACTIVE_USER_DB_ONLINE
[
"user"
],
passwd
=
ACTIVE_USER_DB_ONLINE
[
"passwd"
],
db
=
ACTIVE_USER_DB_ONLINE
[
"db"
])
df
=
con_sql
(
db
,
sql
)
else
:
db
=
pymysql
.
connect
(
host
=
'192.168.15.12'
,
port
=
4000
,
user
=
'root'
,
db
=
'jerry_test'
)
db
=
pymysql
.
connect
(
host
=
ACTIVE_USER_DB_LOCAL
[
"host"
],
port
=
ACTIVE_USER_DB_LOCAL
[
"port"
],
user
=
ACTIVE_USER_DB_LOCAL
[
"user"
],
db
=
ACTIVE_USER_DB_LOCAL
[
"db"
])
sql
=
"select device_id,city_id from user_active_time"
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
df
=
con_sql
(
db
,
sql
)
if
df
.
empty
:
print
(
"当下没有活跃用户数"
)
...
...
@@ -44,19 +75,20 @@ def get_active_users(flag,path,differ):
temp_list
=
df
[
0
]
.
values
.
tolist
()
now
=
datetime
.
now
()
tail6_file_path
=
path
+
"{}tail6Unique.csv"
.
format
(
str
(
now
)[:
10
])
if
os
.
path
.
exists
(
tail6_file_path
):
# 尾号是6的活跃用户数
tail_6_list
=
eval
(
pd
.
read_csv
(
tail6_file_path
)
.
loc
[
0
,
"list"
])
else
:
tail_6_list
=
[]
tail_6_list
.
extend
(
list
(
filter
(
lambda
x
:
(
str
(
x
)[
-
1
]
==
"6"
),
temp_list
)))
if
tail_6_list
!=
[]:
df_tail_6
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
tail_6_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
tail_6_list
))]})
df_tail_6
.
to_csv
(
tail6_file_path
,
index
=
None
)
print
(
"截止现在尾号是6的独立活跃数:{}"
.
format
(
len
(
set
(
tail_6_list
))))
unique_user_count
(
tail6_file_path
,
temp_list
,
now
)
# if os.path.exists(tail6_file_path):
# # 尾号是6的活跃用户数
# tail_6_list = eval(pd.read_csv(tail6_file_path).loc[0, "list"])
# else:
# tail_6_list = []
#
# tail_6_list.extend(list(filter(lambda x: (str(x)[-1] == "6"), temp_list)))
# if tail_6_list != []:
# df_tail_6 = pd.DataFrame({"number": [len(set(tail_6_list))], "time": [str(now)[:16]],
# "list": [list(set(tail_6_list))]})
# df_tail_6.to_csv(tail6_file_path, index=None)
#
# print("截止现在尾号是6的独立活跃数:{}".format(len(set(tail_6_list))))
old_device_id_list
=
pd
.
read_csv
(
path
+
"data_set_device_id.csv"
)[
"device_id"
]
.
values
.
tolist
()
# 求活跃用户和老用户的交集,也就是只预测老用户
df
=
df
.
loc
[
df
[
0
]
.
isin
(
old_device_id_list
)]
...
...
@@ -83,23 +115,25 @@ def get_active_users(flag,path,differ):
#统计尾号6的预测用户
predict_file_path
=
path
+
"{}predictTail6Unique.csv"
.
format
(
str
(
now
)[:
10
])
if
os
.
path
.
exists
(
predict_file_path
):
# 预测过尾号是6的用户数
all_predict_list
=
eval
(
pd
.
read_csv
(
predict_file_path
)
.
loc
[
0
,
"list"
])
else
:
all_predict_list
=
[]
all_predict_list
.
extend
(
device_list
)
if
all_predict_list
!=
[]:
df_predict
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
all_predict_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
all_predict_list
))]})
df_predict
.
to_csv
(
predict_file_path
,
index
=
None
)
predict_user_count
(
predict_file_path
,
device_list
,
now
)
# if os.path.exists(predict_file_path):
# # 预测过尾号是6的用户数
# all_predict_list = eval(pd.read_csv(predict_file_path).loc[0, "list"])
# else:
# all_predict_list = []
# all_predict_list.extend(device_list)
# if all_predict_list != []:
# df_predict = pd.DataFrame({"number": [len(set(all_predict_list))], "time": [str(now)[:16]],
# "list": [list(set(all_predict_list))]})
# df_predict.to_csv(predict_file_path, index=None)
return
device_city_list
def
fetch_user_profile
(
device_id
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select device_id,city_id from data_feed_click where device_id = '{0}' limit 1"
.
format
(
device_id
)
user_profile
=
con_sql
(
sql
)
user_profile
=
con_sql
(
db
,
sql
)
if
user_profile
.
empty
:
print
(
"没有获取到该用户对应的city_id"
)
return
None
,
True
...
...
utils.py
View file @
af381d3d
...
...
@@ -12,14 +12,16 @@ import signal
from
config
import
*
import
socket
def
judge_online
():
flag
=
True
path
=
DIRECTORY_PATH
# 下面这个ip是本地电脑ip
if
socket
.
gethostbyname
(
socket
.
gethostname
())
==
'172.30.8.160'
:
flag
=
False
path
=
LOCAL_DIRCTORY
return
flag
,
else
:
flag
=
True
path
=
DIRECTORY_PATH
return
flag
,
path
def
get_date
():
...
...
@@ -50,13 +52,32 @@ def get_roc_curve(y, pred, pos_label):
print
(
AUC
)
# 从Tidb数据库的表里获取数据,并转化成df格式
def
con_sql
(
sql
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
# 从Tidb数据库的表里获取数据,并转化成df格式,去掉空值
def
con_sql
(
db
,
sql
):
cursor
=
db
.
cursor
()
try
:
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
except
Exception
:
print
(
"发生异常"
,
Exception
)
df
=
pd
.
DataFrame
()
finally
:
db
.
close
()
return
df
# 下面这个函数与上面那个函数区别是上面那个函数去掉了空值
def
sql_df
(
db
,
sql
):
cursor
=
db
.
cursor
()
try
:
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
except
Exception
:
print
(
"发生异常"
,
Exception
)
df
=
pd
.
DataFrame
()
finally
:
db
.
close
()
return
df
...
...
@@ -80,7 +101,7 @@ def restart_process():
except
OSError
:
print
(
'没有如此进程!!!'
)
os
.
popen
(
'python diaryUpdateOnlineOffline.py'
)
print
(
"成功重启diaryUpdateOnlineOffline.py"
)
print
(
"
已经
成功重启diaryUpdateOnlineOffline.py"
)
else
:
os
.
popen
(
'python diaryUpdateOnlineOffline.py'
)
print
(
"成功重启diaryUpdateOnlineOffline.py"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment