Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
f154eb8d
Commit
f154eb8d
authored
Sep 26, 2018
by
高雅喆
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' into gyz
parents
eae303de
ae152f48
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
433 additions
and
167 deletions
+433
-167
config.py
config.py
+29
-11
ctr.py
ctr.py
+92
-0
diaryUpdateOnlineOffline.py
diaryUpdateOnlineOffline.py
+28
-26
ctr.py
local/ctr.py
+41
-0
testCases.py
local/testCases.py
+55
-12
utils.py
local/utils.py
+64
-62
prepareData.py
prepareData.py
+12
-8
train.py
train.py
+1
-2
userProfile.py
userProfile.py
+67
-33
utils.py
utils.py
+44
-13
No files found.
config.py
View file @
f154eb8d
# 线上地址
DIRECTORY_PATH
=
'/data/models/'
# 测试日期一定要大于验证日期,因为切割数据集的代码是这样设置的
# VALIDATION_DATE = '2018-08-05'
# TEST_DATE = '2018-08-06'
# DATA_START_DATE = '2018-07-05'
# DATA_END_DATE = '2018-08-06'
# 本地地址
LOCAL_DIRCTORY
=
"/Users/mac/utils/"
# 线上用户活跃表db
ACTIVE_USER_DB_ONLINE
=
{
"host"
:
'10.66.157.22'
,
"port"
:
4000
,
"user"
:
'root'
,
"passwd"
:
'3SYz54LS9#^9sBvC'
,
"db"
:
'jerry_prod'
}
#线下用户活跃表db
ACTIVE_USER_DB_LOCAL
=
{
"host"
:
'192.168.15.12'
,
"port"
:
4000
,
"user"
:
'root'
,
"db"
:
'jerry_test'
}
# 线上日记队列db
QUEUE_DB_ONLINE
=
{
"host"
:
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
"port"
:
3306
,
"user"
:
'doris'
,
"passwd"
:
'o5gbA27hXHHm'
,
"db"
:
'doris_prod'
}
# 本地日记队列db
QUEUE_DB_LOCAL
=
{
"host"
:
'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
,
"port"
:
3306
,
"user"
:
'work'
,
"passwd"
:
'workwork'
,
"db"
:
'doris_test'
}
# 线上日记打分
SCORE_DB_ONLINE
=
{
"host"
:
'10.66.157.22'
,
"port"
:
4000
,
"user"
:
'root'
,
"passwd"
:
'3SYz54LS9#^9sBvC'
,
"db"
:
'eagle'
}
# 本地日记打分db
SCORE_DB_LOCAL
=
{
"host"
:
'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
,
"port"
:
3306
,
"user"
:
'work'
,
"passwd"
:
'workwork'
,
"db"
:
'zhengxing_test'
}
MODEL_VERSION
=
''
lr
=
0.03
l2_lambda
=
0.002
...
...
@@ -18,9 +32,6 @@ ONLINE_EAGLE_HOST = '10.66.157.22'
# 测试日记视频所在的ip
LOCAL_EAGLE_HOST
=
"192.168.15.12"
# 本地地址
LOCAL_DIRCTORY
=
"/Users/mac/utils/"
# 线上日记队列域名
QUEUE_ONLINE_HOST
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
...
...
@@ -28,3 +39,10 @@ QUEUE_ONLINE_HOST = 'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
LOCAL_HOST
=
'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
ctr.py
0 → 100644
View file @
f154eb8d
import
pandas
as
pd
import
pymysql
from
datetime
import
datetime
from
datetime
import
timedelta
def
get_tail8
():
sql
=
"select distinct device_id from data_feed_click
\
where stat_date='{}'
\
and cid_type='{}'
\
and device_id regexp '8$';"
.
format
(
stat_date
,
cid_type
)
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
cursor
=
db
.
cursor
()
print
(
"开始获取"
)
cursor
.
execute
(
sql
)
print
(
"成功获取"
)
result
=
cursor
.
fetchall
()
db
.
close
()
user
=
pd
.
DataFrame
(
list
(
result
))[
0
]
.
values
.
tolist
()
user
=
tuple
(
user
)
print
(
"尾号是8的用户个数"
)
print
(
len
(
user
))
return
user
def
get_ctr
(
user_tuple
):
sql
=
"select count(device_id) from data_feed_click
\
where stat_date='{}'
\
and cid_type='{}'
\
and device_id in {}"
.
format
(
stat_date
,
cid_type
,
user_tuple
)
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
cursor
=
db
.
cursor
()
print
(
"开始获取"
)
cursor
.
execute
(
sql
)
click
=
cursor
.
fetchall
()[
0
][
0
]
print
(
click
)
sql
=
"select count(device_id) from data_feed_exposure
\
where stat_date='{}'
\
and cid_type='{}'
\
and device_id in {}"
.
format
(
stat_date
,
cid_type
,
user_tuple
)
cursor
=
db
.
cursor
()
print
(
"开始获取"
)
cursor
.
execute
(
sql
)
exp
=
cursor
.
fetchall
()[
0
][
0
]
db
.
close
()
print
(
exp
)
print
(
click
/
exp
)
def
get_tail6
():
df
=
pd
.
read_csv
(
path
+
"{}predictTail6Unique.csv"
.
format
(
stat_date
))
pre_list
=
tuple
(
eval
(
df
.
loc
[
0
,
"list"
]))
print
(
len
(
pre_list
))
print
(
pre_list
[:
2
])
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct device_id from data_feed_click
\
where stat_date='{}'
\
and cid_type='{}'
\
and device_id in {}"
.
format
(
stat_date
,
cid_type
,
pre_list
)
cursor
=
db
.
cursor
()
print
(
"开始获取"
)
cursor
.
execute
(
sql
)
print
(
"成功获取"
)
result
=
cursor
.
fetchall
()
db
.
close
()
print
(
pd
.
DataFrame
(
list
(
result
))
.
empty
)
user
=
pd
.
DataFrame
(
list
(
result
))[
0
]
.
values
.
tolist
()
user
=
tuple
(
user
)
print
(
"用户个数"
)
print
(
len
(
user
))
return
user
if
__name__
==
"__main__"
:
path
=
"/data/models/"
cid_type
=
"diary"
now
=
datetime
.
now
()
year
=
now
.
year
month
=
now
.
month
day
=
now
.
day
stat_date
=
datetime
(
year
,
month
,
day
)
stat_date
=
(
stat_date
-
timedelta
(
days
=
1
))
.
strftime
(
"
%
Y-
%
m-
%
d"
)
print
(
stat_date
)
tail6
=
get_tail6
()
get_ctr
(
tail6
)
tail8
=
get_tail8
()
get_ctr
(
tail8
)
diaryUpdateOnlineOffline.py
View file @
f154eb8d
...
...
@@ -13,7 +13,7 @@ from userProfile import get_active_users
from
sklearn.preprocessing
import
MinMaxScaler
import
time
from
config
import
*
import
socket
from
utils
import
judge_online
,
con_sql
def
get_video_id
(
cache_video_id
):
...
...
@@ -38,6 +38,8 @@ def get_video_id(cache_video_id):
return
cache_video_id
else
:
video_id
=
df
[
0
]
.
values
.
tolist
()
print
(
"videoid"
)
print
(
video_id
[:
2
])
return
video_id
...
...
@@ -110,18 +112,18 @@ def save_result(queue_name,queue_arg,device_id):
def
get_score
(
queue_arg
):
if
flag
:
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
db
=
pymysql
.
connect
(
host
=
SCORE_DB_ONLINE
[
"host"
],
port
=
SCORE_DB_ONLINE
[
"port"
],
user
=
SCORE_DB_ONLINE
[
"user"
],
passwd
=
SCORE_DB_ONLINE
[
"passwd"
],
db
=
SCORE_DB_ONLINE
[
"db"
])
else
:
db
=
pymysql
.
connect
(
host
=
LOCAL_HOST
,
port
=
3306
,
user
=
'work'
,
passwd
=
'workwork'
,
db
=
'zhengxing_tes'
)
db
=
pymysql
.
connect
(
host
=
SCORE_DB_LOCAL
[
"host"
],
port
=
SCORE_DB_LOCAL
[
"port"
],
user
=
SCORE_DB_LOCAL
[
"user"
],
passwd
=
SCORE_DB_LOCAL
[
"passwd"
],
db
=
SCORE_DB_LOCAL
[
"db"
])
cursor
=
db
.
cursor
()
# 去除diary_id 前面的"diary|"
diary_list
=
tuple
(
list
(
map
(
lambda
x
:
x
[
6
:],
queue_arg
[
2
])))
sql
=
"select score,diary_id from biz_feed_diary_score where diary_id in {};"
.
format
(
diary_list
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
score_df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
score_df
=
con_sql
(
db
,
sql
)
print
(
"get score"
)
return
score_df
...
...
@@ -177,7 +179,6 @@ def update_sql_dairy_queue(queue_name, diary_id,device_id, city_id):
db
=
'doris_prod'
)
else
:
db
=
pymysql
.
connect
(
host
=
LOCAL_HOST
,
port
=
3306
,
user
=
'work'
,
passwd
=
'workwork'
,
db
=
'doris_test'
)
cursor
=
db
.
cursor
()
id_str
=
str
(
diary_id
[
0
])
for
i
in
range
(
1
,
len
(
diary_id
)):
...
...
@@ -206,27 +207,28 @@ def queue_compare(old_list, new_list):
for
key
in
x_dict
.
keys
():
if
x_dict
[
key
]
!=
y_dict
[
key
]:
i
+=
1
if
i
>
0
:
print
(
"日记队列更新前日记总个数{},位置发生变化个数{},发生变化率{}
%
"
.
format
(
len
(
old_list
),
i
,
round
(
i
/
len
(
old_list
)
*
100
),
2
))
return
True
else
:
return
False
def
get_queue
(
device_id
,
city_id
,
queue_name
):
if
flag
:
db
=
pymysql
.
connect
(
host
=
QUEUE_ONLINE_HOST
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
else
:
db
=
pymysql
.
connect
(
host
=
LOCAL_HOST
,
port
=
3306
,
user
=
'work'
,
passwd
=
'workwork'
,
db
=
'doris_test'
)
cursor
=
db
.
cursor
()
sql
=
"select {} from device_diary_queue "
\
"where device_id = '{}' and city_id = '{}';"
.
format
(
queue_name
,
device_id
,
city_id
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
if
df
.
empty
:
print
(
"该用户对应的日记为空"
)
return
False
...
...
@@ -260,12 +262,11 @@ def user_update(device_id, city_id, queue_name,data_set_cid,total_video_id):
queue_arg
=
[
queue_predict
,
queue_not_predict
,
queue_list
]
if
queue_predict
!=
[]:
diary_queue
=
pipe_line
(
queue_name
,
queue_arg
,
device_id
,
total_video_id
)
if
diary_queue
:
if
diary_queue
and
queue_compare
(
queue_list
,
diary_queue
)
:
update_sql_dairy_queue
(
queue_name
,
diary_queue
,
device_id
,
city_id
)
queue_compare
(
queue_list
,
diary_queue
)
# print("更新结束")
print
(
"更新结束"
)
else
:
print
(
"获取的日记列表是空,所以不更新日记队列"
)
print
(
"获取的日记列表是空
或者日记队列顺序没有变化
,所以不更新日记队列"
)
else
:
print
(
"预测集是空,不需要预测"
)
else
:
...
...
@@ -274,6 +275,7 @@ def user_update(device_id, city_id, queue_name,data_set_cid,total_video_id):
def
multi_proecess_update
(
device_id
,
city_id
,
data_set_cid
,
total_video_id
):
queue_name_list
=
[
"native_queue"
,
"nearby_queue"
,
"nation_queue"
,
"megacity_queue"
]
pool
=
Pool
(
4
)
for
queue_name
in
queue_name_list
:
pool
.
apply_async
(
user_update
,
(
device_id
,
city_id
,
queue_name
,
data_set_cid
,
total_video_id
,))
...
...
@@ -283,31 +285,31 @@ def multi_proecess_update(device_id, city_id, data_set_cid,total_video_id):
if
__name__
==
"__main__"
:
warnings
.
filterwarnings
(
"ignore"
)
flag
=
True
path
=
DIRECTORY_PATH
# 下面这个ip是本地电脑ip
if
socket
.
gethostbyname
(
socket
.
gethostname
())
==
'172.30.8.160'
:
flag
=
False
path
=
LOCAL_DIRCTORY
flag
,
path
=
judge_online
()
# 增加缓存日记视频列表
cache_video_id
=
[]
cache_device_city_list
=
[]
differ
=
0
while
True
:
data_set_cid
=
pd
.
read_csv
(
path
+
"data_set_cid.csv"
)[
"cid"
]
.
values
.
tolist
()
total_video_id
=
get_video_id
(
cache_video_id
)
cache_video_id
=
total_video_id
start
=
time
.
time
()
device_city_list
=
get_active_users
(
flag
,
path
,
differ
)
device_city_list
=
get_active_users
(
flag
,
path
,
differ
)
time1
=
time
.
time
()
print
(
"获取用户活跃表耗时:{}秒"
.
format
(
time1
-
start
))
# 过滤掉5分钟内预测过的用户
device_city_list
=
list
(
set
(
tuple
(
device_city_list
))
-
set
(
tuple
(
cache_device_city_list
)))
print
(
"device_city_list"
)
print
(
device_city_list
)
if
datetime
.
now
()
.
minute
%
5
==
0
:
cache_device_city_list
=
[]
if
device_city_list
!=
[]:
data_set_cid
=
pd
.
read_csv
(
path
+
"data_set_cid.csv"
)[
"cid"
]
.
values
.
tolist
()
total_video_id
=
get_video_id
(
cache_video_id
)
cache_video_id
=
total_video_id
cache_device_city_list
.
extend
(
device_city_list
)
for
device_city
in
device_city_list
:
multi_proecess_update
(
device_city
[
0
],
device_city
[
1
],
data_set_cid
,
total_video_id
)
differ
=
time
.
time
()
-
start
print
(
"differ:{}秒"
.
format
(
differ
))
...
...
local/ctr.py
0 → 100644
View file @
f154eb8d
import
pandas
as
pd
import
pymysql
df
=
pd
.
read_csv
(
r"/data2/models/2018-09-02predictTail6Unique.csv"
)
a
=
eval
(
df
.
loc
[
0
,
"list"
])
a
=
list
(
map
(
lambda
x
:
x
[
0
],
a
))
print
(
len
(
a
))
print
(
a
[:
2
])
cf
=
pd
.
read_csv
(
r"/data2/models/nvwa-2018-09-02predictTail6Unique.csv"
)
b
=
eval
(
cf
.
loc
[
0
,
"list"
])
print
(
len
(
b
))
print
(
b
[:
2
])
a
.
extend
(
b
)
print
(
"个数"
)
print
(
len
(
set
(
a
)))
pre_list
=
list
(
set
(
a
))
print
(
pre_list
[:
2
])
stat_date
=
"2018-09-02"
cid_type
=
"diary"
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select count(device_id) from data_feed_exposure2
\
where stat_date='{}'
\
and cid_type='{}'
\
and device_id in {}"
.
format
(
stat_date
,
cid_type
,
pre_list
)
cursor
=
db
.
cursor
()
print
(
"开始获取"
)
cursor
.
execute
(
sql
)
print
(
"成功获取"
)
click
=
cursor
.
fetchall
()[
0
][
0
]
print
(
click
)
sql
=
"select count(device_id) from data_feed_exposure2
\
where stat_date='{}'
\
and cid_type='{}'
\
and device_id in {}"
.
format
(
stat_date
,
cid_type
,
pre_list
)
cursor
=
db
.
cursor
()
print
(
"开始获取"
)
cursor
.
execute
(
sql
)
exp
=
cursor
.
fetchall
()[
0
][
0
]
print
(
exp
)
print
(
click
/
exp
)
local/testCases.py
View file @
f154eb8d
...
...
@@ -39,23 +39,66 @@ def get_local_device():
df
.
to_csv
(
'/Users/mac/utils/test_device_city_id.csv'
,
index
=
None
)
print
(
1
)
if
__name__
==
"__main__"
:
LOCAL_HOST
=
'rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com'
db
=
pymysql
.
connect
(
host
=
LOCAL_HOST
,
port
=
3306
,
user
=
'work'
,
passwd
=
'workwork'
,
db
=
'doris_test'
)
diary_id
=
[
14207355
,
16197023
,
13006945
,
12363565
,
15296547
,
15082216
,
16198052
,
15228350
,
13006942
,
14229868
,
15303563
,
16211116
,
15225921
,
15250715
,
15271108
,
15479315
,
16197047
,
15544727
,
15336944
,
15486003
,
15517196
,
16211130
,
15547275
,
15572010
]
device_id
=
'99000645287876'
city_id
=
'beijing'
def
save_queue
():
queue_name_list
=
[
"native_queue"
,
"nearby_queue"
,
"nation_queue"
,
"megacity_queue"
]
for
i
in
queue_name_list
:
sql
=
"select {} from device_diary_queue "
\
"where device_id = '{}' and city_id = '{}';"
.
format
(
i
,
device_id
,
city_id
)
db
=
pymysql
.
connect
(
host
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
print
(
df
.
shape
)
df
.
to_csv
(
"/data/models/{}.csv"
.
format
(
i
),
index
=
None
)
print
(
"end"
)
def
delete
():
db
=
pymysql
.
connect
(
host
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
cursor
=
db
.
cursor
()
id_str
=
str
(
diary_id
[
0
])
for
i
in
range
(
1
,
len
(
diary_id
)):
id_str
=
id_str
+
","
+
str
(
diary_id
[
i
])
sql
=
"delete from device_diary_queue where device_id = '{}' and city_id = '{}';"
.
format
(
device_id
,
city_id
)
cursor
.
execute
(
sql
)
db
.
close
()
print
(
"删除成功"
)
sql
=
"insert into device_diary_queue values ('{}','{}','{}','{}','{}','{}',89)"
.
format
\
(
device_id
,
city_id
,
id_str
,
id_str
,
id_str
,
id_str
)
def
insert
():
queue_name_list
=
[
"native_queue"
,
"nearby_queue"
,
"nation_queue"
,
"megacity_queue"
]
a
=
{}
for
i
in
queue_name_list
:
a
[
i
]
=
pd
.
read_csv
(
"/data/models/native_queue.csv"
)[
"0"
]
.
values
.
tolist
()[
0
]
# columns = ["native_queue", "nearby_queue", "nation_queue", "megacity_queue","id","device_id","city_id"]
db
=
pymysql
.
connect
(
host
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
sql
=
"INSERT INTO device_diary_queue (native_queue, nearby_queue, nation_queue, "
\
"megacity_queue,id,device_id,city_id) VALUES ('{}','{}','{}','{}','{}','{}','{}');"
.
format
\
(
a
[
"native_queue"
],
a
[
"nearby_queue"
],
a
[
"nation_queue"
],
a
[
"megacity_queue"
],
id
,
device_id
,
city_id
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
db
.
commit
()
db
.
close
()
print
(
"成功写入diary_id"
)
print
(
"end"
)
if
__name__
==
"__main__"
:
# 先把数据保存下来,调用上面的save函数,然后调上面delete函数删除数据,然后调insert函数插入数据
id
=
334
device_id
=
'00CA20EB-2719-4518-85CC-60E765AC526F'
city_id
=
'beijing'
save_queue
()
delete
()
insert
()
...
...
local/utils.py
View file @
f154eb8d
...
...
@@ -165,65 +165,67 @@ class multiFFMFormatPandas:
return
False
# ffm 格式转换函数、类
# class FFMFormatPandas:
# def __init__(self):
# self.field_index_ = None
# self.feature_index_ = None
# self.y = None
#
# def fit(self, df, y=None):
# self.y = y
# df_ffm = df[df.columns.difference([self.y])]
# if self.field_index_ is None:
# self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
#
# if self.feature_index_ is not None:
# last_idx = max(list(self.feature_index_.values()))
#
# if self.feature_index_ is None:
# self.feature_index_ = dict()
# last_idx = 0
#
# for col in df.columns:
# vals = df[col].unique()
# for val in vals:
# if pd.isnull(val):
# continue
# name = '{}_{}'.format(col, val)
# if name not in self.feature_index_:
# self.feature_index_[name] = last_idx
# last_idx += 1
# self.feature_index_[col] = last_idx
# last_idx += 1
# return self
#
# def fit_transform(self, df, y=None):
# self.fit(df, y)
# return self.transform(df)
#
# def transform_row_(self, row, t):
# ffm = []
# if self.y is not None:
# ffm.append(str(row.loc[row.index == self.y][0]))
# if self.y is None:
# ffm.append(str(0))
#
# for col, val in row.loc[row.index != self.y].to_dict().items():
# col_type = t[col]
# name = '{}_{}'.format(col, val)
# if col_type.kind == 'O':
# ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
# elif col_type.kind == 'i':
# ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
# return ' '.join(ffm)
#
# def transform(self, df):
# t = df.dtypes.to_dict()
# return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
#
# # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
# def is_feature_index_exist(self, name):
# if name in self.feature_index_:
# return True
# else:
# return False
class
FFMFormatPandas
:
def
__init__
(
self
):
self
.
field_index_
=
None
self
.
feature_index_
=
None
self
.
y
=
None
def
fit
(
self
,
df
,
y
=
None
):
self
.
y
=
y
df_ffm
=
df
[
df
.
columns
.
difference
([
self
.
y
])]
if
self
.
field_index_
is
None
:
# 除了y,每列列名加索引对应的字典,例如field_index = {name:0,age:1}
self
.
field_index_
=
{
col
:
i
for
i
,
col
in
enumerate
(
df_ffm
)}
if
self
.
feature_index_
is
not
None
:
last_idx
=
max
(
list
(
self
.
feature_index_
.
values
()))
if
self
.
feature_index_
is
None
:
self
.
feature_index_
=
dict
()
last_idx
=
0
# 下面这个feature包括y,应该不包括。这是个bug
for
col
in
df
.
columns
:
vals
=
df
[
col
]
.
unique
()
for
val
in
vals
:
if
pd
.
isnull
(
val
):
continue
name
=
'{}_{}'
.
format
(
col
,
val
)
if
name
not
in
self
.
feature_index_
:
# feature_index = {name_tom :0,name_lily :1,name:2,age_18:3,age_19:4:age:5}
self
.
feature_index_
[
name
]
=
last_idx
last_idx
+=
1
self
.
feature_index_
[
col
]
=
last_idx
last_idx
+=
1
return
self
def
fit_transform
(
self
,
df
,
y
=
None
):
self
.
fit
(
df
,
y
)
return
self
.
transform
(
df
)
def
transform_row_
(
self
,
row
,
t
):
ffm
=
[]
if
self
.
y
is
not
None
:
ffm
.
append
(
str
(
row
.
loc
[
row
.
index
==
self
.
y
][
0
]))
if
self
.
y
is
None
:
ffm
.
append
(
str
(
0
))
for
col
,
val
in
row
.
loc
[
row
.
index
!=
self
.
y
]
.
to_dict
()
.
items
():
col_type
=
t
[
col
]
name
=
'{}_{}'
.
format
(
col
,
val
)
if
col_type
.
kind
==
'O'
:
ffm
.
append
(
'{}:{}:1'
.
format
(
self
.
field_index_
[
col
],
self
.
feature_index_
[
name
]))
elif
col_type
.
kind
==
'i'
:
ffm
.
append
(
'{}:{}:{}'
.
format
(
self
.
field_index_
[
col
],
self
.
feature_index_
[
col
],
val
))
return
' '
.
join
(
ffm
)
def
transform
(
self
,
df
):
t
=
df
.
dtypes
.
to_dict
()
return
pd
.
Series
({
idx
:
self
.
transform_row_
(
row
,
t
)
for
idx
,
row
in
df
.
iterrows
()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def
is_feature_index_exist
(
self
,
name
):
if
name
in
self
.
feature_index_
:
return
True
else
:
return
False
prepareData.py
View file @
f154eb8d
from
utils
import
con_sql
import
datetime
import
time
import
pymysql
def
fetch_data
(
start_date
,
end_date
):
# 获取点击表里的device_id
sql
=
"select distinct device_id from data_feed_click2"
click_device_id
=
con_sql
(
sql
)[
0
]
.
values
.
tolist
()
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct device_id from data_feed_click"
click_device_id
=
con_sql
(
db
,
sql
)[
0
]
.
values
.
tolist
()
print
(
"成功获取点击表里的device_id"
)
# 获取点击表里的数据
sql
=
"select cid,device_id,time,stat_date from data_feed_click
2
"
\
sql
=
"select cid,device_id,time,stat_date from data_feed_click "
\
"where stat_date >= '{0}' and stat_date <= '{1}'"
.
format
(
start_date
,
end_date
)
click
=
con_sql
(
sql
)
# 因为上面的db已经关了,需要再写一遍
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
click
=
con_sql
(
db
,
sql
)
click
=
click
.
rename
(
columns
=
{
0
:
"cid"
,
1
:
"device_id"
,
2
:
"time_date"
,
3
:
"stat_date"
})
print
(
"成功获取点击表里的数据"
)
# 从time特征中抽取hour
...
...
@@ -22,10 +24,12 @@ def fetch_data(start_date, end_date):
click
=
click
.
drop
(
"time_date"
,
axis
=
1
)
# 获取曝光表里的数据
sql
=
"select cid,device_id,time,stat_date from data_feed_exposure
2
"
\
sql
=
"select cid,device_id,time,stat_date from data_feed_exposure "
\
"where stat_date >= '{0}' and stat_date <= '{1}'"
.
format
(
start_date
,
end_date
)
start
=
time
.
time
()
exposure
=
con_sql
(
sql
)
# 因为上面的db已经关了,需要再写一遍
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
exposure
=
con_sql
(
db
,
sql
)
end
=
time
.
time
()
print
(
"获取曝光表耗时{}分"
.
format
((
end
-
start
)
/
60
))
exposure
=
exposure
.
rename
(
columns
=
{
0
:
"cid"
,
1
:
"device_id"
,
2
:
"time_date"
,
3
:
"stat_date"
})
...
...
train.py
View file @
f154eb8d
...
...
@@ -18,8 +18,7 @@ if __name__ == "__main__":
end_train
=
time
.
time
()
print
(
"训练模型耗时{}分"
.
format
((
end_train
-
start_train
)
/
60
))
move_file
()
#TODO 如果用自己写的keepProcess文件守护进程,下面在这个函数里删掉重新启动进程那行代码,因为可能会多启动一次进程
# restart_process()
...
...
userProfile.py
View file @
f154eb8d
...
...
@@ -8,6 +8,37 @@ import pymysql
import
time
# 统计尾号6的活跃用户数
def
unique_user_count
(
file_path
,
temp_list
,
now
):
if
os
.
path
.
exists
(
file_path
):
# 尾号是6的活跃用户数
tail_6_list
=
eval
(
pd
.
read_csv
(
file_path
)
.
loc
[
0
,
"list"
])
else
:
tail_6_list
=
[]
tail_6_list
.
extend
(
list
(
filter
(
lambda
x
:
(
str
(
x
)[
-
1
]
==
"6"
),
temp_list
)))
if
tail_6_list
!=
[]:
df_tail_6
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
tail_6_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
tail_6_list
))]})
df_tail_6
.
to_csv
(
file_path
,
index
=
None
)
print
(
"截止现在尾号是6的独立活跃数:{}"
.
format
(
len
(
set
(
tail_6_list
))))
# 统计预测过的独立用户数
def
predict_user_count
(
predict_file_path
,
device_list
,
now
):
if
os
.
path
.
exists
(
predict_file_path
):
# 预测过尾号是6的用户数
all_predict_list
=
eval
(
pd
.
read_csv
(
predict_file_path
)
.
loc
[
0
,
"list"
])
else
:
all_predict_list
=
[]
all_predict_list
.
extend
(
device_list
)
if
all_predict_list
!=
[]:
df_predict
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
all_predict_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
all_predict_list
))]})
df_predict
.
to_csv
(
predict_file_path
,
index
=
None
)
print
(
"截止现在预测过尾号是6的独立活跃数:{}"
.
format
(
len
(
set
(
all_predict_list
))))
# 获取当下一分钟内活跃用户
def
get_active_users
(
flag
,
path
,
differ
):
if
differ
==
0
:
...
...
@@ -23,18 +54,18 @@ def get_active_users(flag,path,differ):
start
=
end
-
differ
end_datetime
=
str
(
datetime
.
fromtimestamp
(
end
))
start_datetime
=
str
(
datetime
.
fromtimestamp
(
start
))
sql
=
"select device_id,city_id from user_active_time "
\
"where active_time <= '{}' and active_time >= '{}'"
.
format
(
end_datetime
,
start_datetime
)
if
flag
:
df
=
con_sql
(
sql
)
sql
=
"select device_id,city_id from user_active_time "
\
"where active_time <= '{}' and active_time >= '{}'"
.
format
(
end_datetime
,
start_datetime
)
db
=
pymysql
.
connect
(
host
=
ACTIVE_USER_DB_ONLINE
[
"host"
],
port
=
ACTIVE_USER_DB_ONLINE
[
"port"
],
user
=
ACTIVE_USER_DB_ONLINE
[
"user"
],
passwd
=
ACTIVE_USER_DB_ONLINE
[
"passwd"
],
db
=
ACTIVE_USER_DB_ONLINE
[
"db"
])
df
=
con_sql
(
db
,
sql
)
else
:
db
=
pymysql
.
connect
(
host
=
'192.168.15.12'
,
port
=
4000
,
user
=
'root'
,
db
=
'jerry_test'
)
db
=
pymysql
.
connect
(
host
=
ACTIVE_USER_DB_LOCAL
[
"host"
],
port
=
ACTIVE_USER_DB_LOCAL
[
"port"
],
user
=
ACTIVE_USER_DB_LOCAL
[
"user"
],
db
=
ACTIVE_USER_DB_LOCAL
[
"db"
])
sql
=
"select device_id,city_id from user_active_time"
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
df
=
con_sql
(
db
,
sql
)
if
df
.
empty
:
print
(
"当下没有活跃用户数"
)
...
...
@@ -44,19 +75,20 @@ def get_active_users(flag,path,differ):
temp_list
=
df
[
0
]
.
values
.
tolist
()
now
=
datetime
.
now
()
tail6_file_path
=
path
+
"{}tail6Unique.csv"
.
format
(
str
(
now
)[:
10
])
if
os
.
path
.
exists
(
tail6_file_path
):
# 尾号是6的活跃用户数
tail_6_list
=
eval
(
pd
.
read_csv
(
tail6_file_path
)
.
loc
[
0
,
"list"
])
else
:
tail_6_list
=
[]
tail_6_list
.
extend
(
list
(
filter
(
lambda
x
:
(
str
(
x
)[
-
1
]
==
"6"
),
temp_list
)))
if
tail_6_list
!=
[]:
df_tail_6
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
tail_6_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
tail_6_list
))]})
df_tail_6
.
to_csv
(
tail6_file_path
,
index
=
None
)
print
(
"截止现在尾号是6的独立活跃数:{}"
.
format
(
len
(
set
(
tail_6_list
))))
unique_user_count
(
tail6_file_path
,
temp_list
,
now
)
# if os.path.exists(tail6_file_path):
# # 尾号是6的活跃用户数
# tail_6_list = eval(pd.read_csv(tail6_file_path).loc[0, "list"])
# else:
# tail_6_list = []
#
# tail_6_list.extend(list(filter(lambda x: (str(x)[-1] == "6"), temp_list)))
# if tail_6_list != []:
# df_tail_6 = pd.DataFrame({"number": [len(set(tail_6_list))], "time": [str(now)[:16]],
# "list": [list(set(tail_6_list))]})
# df_tail_6.to_csv(tail6_file_path, index=None)
#
# print("截止现在尾号是6的独立活跃数:{}".format(len(set(tail_6_list))))
old_device_id_list
=
pd
.
read_csv
(
path
+
"data_set_device_id.csv"
)[
"device_id"
]
.
values
.
tolist
()
# 求活跃用户和老用户的交集,也就是只预测老用户
df
=
df
.
loc
[
df
[
0
]
.
isin
(
old_device_id_list
)]
...
...
@@ -83,23 +115,25 @@ def get_active_users(flag,path,differ):
#统计尾号6的预测用户
predict_file_path
=
path
+
"{}predictTail6Unique.csv"
.
format
(
str
(
now
)[:
10
])
if
os
.
path
.
exists
(
predict_file_path
):
# 预测过尾号是6的用户数
all_predict_list
=
eval
(
pd
.
read_csv
(
predict_file_path
)
.
loc
[
0
,
"list"
])
else
:
all_predict_list
=
[]
all_predict_list
.
extend
(
device_list
)
if
all_predict_list
!=
[]:
df_predict
=
pd
.
DataFrame
({
"number"
:
[
len
(
set
(
all_predict_list
))],
"time"
:
[
str
(
now
)[:
16
]],
"list"
:
[
list
(
set
(
all_predict_list
))]})
df_predict
.
to_csv
(
predict_file_path
,
index
=
None
)
predict_user_count
(
predict_file_path
,
device_list
,
now
)
# if os.path.exists(predict_file_path):
# # 预测过尾号是6的用户数
# all_predict_list = eval(pd.read_csv(predict_file_path).loc[0, "list"])
# else:
# all_predict_list = []
# all_predict_list.extend(device_list)
# if all_predict_list != []:
# df_predict = pd.DataFrame({"number": [len(set(all_predict_list))], "time": [str(now)[:16]],
# "list": [list(set(all_predict_list))]})
# df_predict.to_csv(predict_file_path, index=None)
return
device_city_list
def
fetch_user_profile
(
device_id
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select device_id,city_id from data_feed_click where device_id = '{0}' limit 1"
.
format
(
device_id
)
user_profile
=
con_sql
(
sql
)
user_profile
=
con_sql
(
db
,
sql
)
if
user_profile
.
empty
:
print
(
"没有获取到该用户对应的city_id"
)
return
None
,
True
...
...
utils.py
View file @
f154eb8d
...
...
@@ -10,6 +10,18 @@ from multiprocessing import Pool
import
os
import
signal
from
config
import
*
import
socket
def
judge_online
():
# 下面这个ip是本地电脑ip
if
socket
.
gethostbyname
(
socket
.
gethostname
())
==
'172.30.8.160'
:
flag
=
False
path
=
LOCAL_DIRCTORY
else
:
flag
=
True
path
=
DIRECTORY_PATH
return
flag
,
path
def
get_date
():
...
...
@@ -19,11 +31,11 @@ def get_date():
day
=
now
.
day
date
=
datetime
(
year
,
month
,
day
)
data_start_date
=
"2018-07-15"
data_end_date
=
"2018-08-30
"
validation_date
=
"2018-08-29
"
# data_end_date = "2018-09-02
"
# validation_date = "2018-09-01
"
# data_start_date = (date - timedelta(days=3)).strftime("%Y-%m-%d")
#
data_end_date = (date - timedelta(days=1)).strftime("%Y-%m-%d")
#
validation_date = (date - timedelta(days=2)).strftime("%Y-%m-%d")
data_end_date
=
(
date
-
timedelta
(
days
=
1
))
.
strftime
(
"
%
Y-
%
m-
%
d"
)
validation_date
=
(
date
-
timedelta
(
days
=
2
))
.
strftime
(
"
%
Y-
%
m-
%
d"
)
# 验证集和测试集的日期必须相差一天,否则切割数据集时会报错
test_date
=
data_end_date
print
(
"data_start_date,data_end_date,validation_date,test_date:"
)
...
...
@@ -40,14 +52,33 @@ def get_roc_curve(y, pred, pos_label):
print
(
AUC
)
# 从Tidb数据库的表里获取数据,并转化成df格式
def
con_sql
(
sql
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
# 从Tidb数据库的表里获取数据,并转化成df格式,去掉空值
def
con_sql
(
db
,
sql
):
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
try
:
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
except
Exception
:
print
(
"发生异常"
,
Exception
)
df
=
pd
.
DataFrame
()
finally
:
db
.
close
()
return
df
# 下面这个函数与上面那个函数区别是上面那个函数去掉了空值
def
sql_df
(
db
,
sql
):
cursor
=
db
.
cursor
()
try
:
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
except
Exception
:
print
(
"发生异常"
,
Exception
)
df
=
pd
.
DataFrame
()
finally
:
db
.
close
()
return
df
...
...
@@ -70,7 +101,7 @@ def restart_process():
except
OSError
:
print
(
'没有如此进程!!!'
)
os
.
popen
(
'python diaryUpdateOnlineOffline.py'
)
print
(
"成功重启diaryUpdateOnlineOffline.py"
)
print
(
"
已经
成功重启diaryUpdateOnlineOffline.py"
)
else
:
os
.
popen
(
'python diaryUpdateOnlineOffline.py'
)
print
(
"成功重启diaryUpdateOnlineOffline.py"
)
...
...
@@ -252,7 +283,7 @@ class multiFFMFormatPandas:
# t = df.dtypes.to_dict()
# return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
#
#
#
下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
# def is_feature_index_exist(self, name):
# if name in self.feature_index_:
# return True
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment