Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
96b96294
Commit
96b96294
authored
Aug 21, 2018
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add test file
parent
6bc7b2c7
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
264 additions
and
25 deletions
+264
-25
precitDIaryLocal.py
local/precitDIaryLocal.py
+21
-9
testCases.py
local/testCases.py
+9
-16
utils.py
local/utils.py
+234
-0
No files found.
local/precitDIaryLocal.py
View file @
96b96294
import
pickle
import
pickle
import
xlearn
as
xl
import
xlearn
as
xl
from
utils
import
*
import
pandas
as
pd
import
pymysql
from
datetime
import
datetime
import
utils
# 本地测试脚本
# 本地测试脚本
...
@@ -21,9 +24,14 @@ def test_con_sql(device_id):
...
@@ -21,9 +24,14 @@ def test_con_sql(device_id):
nearby_queue
=
df
.
loc
[
0
,
"nearby_queue"
]
.
split
(
","
)
nearby_queue
=
df
.
loc
[
0
,
"nearby_queue"
]
.
split
(
","
)
nation_queue
=
df
.
loc
[
0
,
"nation_queue"
]
.
split
(
","
)
nation_queue
=
df
.
loc
[
0
,
"nation_queue"
]
.
split
(
","
)
megacity_queue
=
df
.
loc
[
0
,
"megacity_queue"
]
.
split
(
","
)
megacity_queue
=
df
.
loc
[
0
,
"megacity_queue"
]
.
split
(
","
)
db
.
close
()
db
.
close
()
print
(
native_queue
)
return
native_queue
,
nearby_queue
,
nation_queue
,
megacity_queue
print
(
nearby_queue
)
print
(
nation_queue
)
print
(
megacity_queue
)
return
native_queue
,
nearby_queue
,
nation_queue
,
megacity_queue
else
:
print
(
"该用户对应的日记队列为空"
)
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
...
@@ -40,6 +48,8 @@ def feature_en(x_list, device_id):
...
@@ -40,6 +48,8 @@ def feature_en(x_list, device_id):
data
[
"minute"
]
=
data
[
"minute"
]
.
astype
(
"category"
)
data
[
"minute"
]
=
data
[
"minute"
]
.
astype
(
"category"
)
# 虽然预测y,但ffm转化需要y,并不影响预测结果
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data
[
"y"
]
=
0
data
[
"y"
]
=
0
data
.
to_csv
(
"/Users/mac/utils/result/data.csv"
)
print
(
data
)
return
data
return
data
...
@@ -57,7 +67,7 @@ def transform_ffm_format(df, device_id):
...
@@ -57,7 +67,7 @@ def transform_ffm_format(df, device_id):
# 将模型加载,预测,把预测日记的概率值按照降序排序,存到一个表里
# 将模型加载,预测,把预测日记的概率值按照降序排序,存到一个表里
def
predict
(
queue_name
,
x_list
,
device_id
):
def
predict
(
queue_name
,
x_list
,
device_id
):
data
=
feature_en
(
x_list
)
data
=
feature_en
(
x_list
,
device_id
)
data_file_path
=
transform_ffm_format
(
data
,
device_id
)
data_file_path
=
transform_ffm_format
(
data
,
device_id
)
ffm_model
=
xl
.
create_ffm
()
ffm_model
=
xl
.
create_ffm
()
...
@@ -156,12 +166,14 @@ def get_update_time():
...
@@ -156,12 +166,14 @@ def get_update_time():
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
# 数据库没有更新时间字段,下面的代码不能使用
# 数据库没有更新时间字段,下面的代码不能使用
# sql_update_time_start = get_update_time()
# sql_update_time_start = get_update_time()
native_queue_list
,
nearby_queue_list
,
nation_queue_list
,
megacity_queue_list
=
test_con_sql
(
"device_id"
)
device_id
=
"358035085192742"
native_queue_list
,
nearby_queue_list
,
nation_queue_list
,
megacity_queue_list
=
test_con_sql
(
device_id
)
name_dict
=
{
"native_queue"
:
native_queue_list
,
"nearby_queue"
:
nearby_queue_list
,
name_dict
=
{
"native_queue"
:
native_queue_list
,
"nearby_queue"
:
nearby_queue_list
,
"nation_queue"
:
nation_queue_list
,
"megacity_queue"
:
megacity_queue_list
}
"nation_queue"
:
nation_queue_list
,
"megacity_queue"
:
megacity_queue_list
}
for
key
in
name_dict
.
keys
():
for
key
in
name_dict
.
keys
():
diary_id
=
predict
(
key
,
name_dict
[
key
],
"devcie_id"
)
diary_id
=
predict
(
key
,
name_dict
[
key
],
device_id
)
sql_update_time_end
=
get_update_time
()
#
sql_update_time_end = get_update_time()
# 数据库没有更新时间字段,下面的代码不能使用
# 数据库没有更新时间字段,下面的代码不能使用
# if sql_update_time_start == sql_update_time_end:
# if sql_update_time_start == sql_update_time_end:
update_sql_dairy_queue
(
key
,
diary_id
,
"device_id"
)
update_sql_dairy_queue
(
key
,
diary_id
,
device_id
)
local/testCases.py
View file @
96b96294
from
utils
import
*
import
utils
import
datetime
import
pickle
import
pickle
import
time
import
pandas
as
pd
DIRECTORY_PATH
=
'/data2/models/'
with
open
(
DIRECTORY_PATH
+
"ffm.pkl"
,
"rb"
)
as
f
:
ffm_format_pandas
=
pickle
.
load
(
f
)
df
=
pd
.
read_csv
(
"/home/zhangyanzhao/data.csv"
)
if
__name__
==
'__main__'
:
data
=
ffm_format_pandas
.
transform
(
df
)
df
=
pd
.
read_csv
(
"/Users/mac/PycharmProjects/nvwa/ffm-baseline/data/test-data/大数据.csv"
)
data
.
to_csv
(
"/home/zhangyanzhao/ffm.csv"
,
index
=
False
,
header
=
None
)
for
i
in
range
(
500
,
10000
,
500
):
\ No newline at end of file
start
=
time
.
time
()
ffm
=
multiFFMFormatPandas
()
data
=
ffm
.
fit_transform
(
df
,
y
=
"y"
,
n
=
i
,
processes
=
3
)
end
=
time
.
time
()
print
(
"分割单位{}耗时{}"
.
format
(
i
,
end
-
start
))
local/utils.py
0 → 100644
View file @
96b96294
# encoding = "utf-8"
from
datetime
import
datetime
from
datetime
import
timedelta
import
pymysql
import
numpy
as
np
import
redis
import
pandas
as
pd
from
sklearn
import
metrics
from
sklearn.metrics
import
auc
from
multiprocessing
import
Pool
def
get_date
():
now
=
datetime
.
now
()
year
=
now
.
year
month
=
now
.
month
day
=
now
.
day
date
=
datetime
(
year
,
month
,
day
)
data_start_date
=
(
date
-
timedelta
(
days
=
31
))
.
strftime
(
"
%
Y-
%
m-
%
d"
)
data_end_date
=
(
date
-
timedelta
(
days
=
1
))
.
strftime
(
"
%
Y-
%
m-
%
d"
)
validation_date
=
(
date
-
timedelta
(
days
=
2
))
.
strftime
(
"
%
Y-
%
m-
%
d"
)
# 验证集和测试集的日期必须相差一天,否则切割数据集时会报错
test_date
=
data_end_date
print
(
"data_start_date,data_end_date,validation_date,test_date:"
)
print
(
data_start_date
,
data_end_date
,
validation_date
,
test_date
)
return
data_start_date
,
data_end_date
,
validation_date
,
test_date
def
get_roc_curve
(
y
,
pred
,
pos_label
):
"""
计算二分类问题的roc和auc
"""
fpr
,
tpr
,
thresholds
=
metrics
.
roc_curve
(
y
,
pred
,
pos_label
)
AUC
=
metrics
.
auc
(
fpr
,
tpr
)
print
(
AUC
)
# 从Tidb数据库的表里获取数据,并转化成df格式
def
con_sql
(
sql
):
db
=
pymysql
.
connect
(
host
=
'10.66.157.22'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
.
dropna
()
db
.
close
()
return
df
# 把数据写到redis里
# TODO 生产环境的redis地址没有提供,下面的地址是测试环境的,需要改成生产环境地址
def
add_data_to_redis
(
key
,
val
):
r
=
redis
.
StrictRedis
(
host
=
'10.30.50.58'
,
port
=
6379
,
db
=
12
)
r
.
set
(
key
,
val
)
# 设置key的过期时间,36小时后过期
r
.
expire
(
key
,
36
*
60
*
60
)
# 多线程ffm转化类:
class
multiFFMFormatPandas
:
def
__init__
(
self
):
self
.
field_index_
=
None
self
.
feature_index_
=
None
self
.
y
=
None
def
fit
(
self
,
df
,
y
=
None
):
self
.
y
=
y
df_ffm
=
df
[
df
.
columns
.
difference
([
self
.
y
])]
if
self
.
field_index_
is
None
:
self
.
field_index_
=
{
col
:
i
for
i
,
col
in
enumerate
(
df_ffm
)}
if
self
.
feature_index_
is
not
None
:
last_idx
=
max
(
list
(
self
.
feature_index_
.
values
()))
if
self
.
feature_index_
is
None
:
self
.
feature_index_
=
dict
()
last_idx
=
0
for
col
in
df
.
columns
:
vals
=
df
[
col
]
.
unique
()
for
val
in
vals
:
if
pd
.
isnull
(
val
):
continue
name
=
'{}_{}'
.
format
(
col
,
val
)
if
name
not
in
self
.
feature_index_
:
self
.
feature_index_
[
name
]
=
last_idx
last_idx
+=
1
self
.
feature_index_
[
col
]
=
last_idx
last_idx
+=
1
return
self
def
fit_transform
(
self
,
df
,
y
=
None
,
n
=
50000
,
processes
=
4
):
# n是每个线程运行最大的数据条数,processes是线程数
self
.
fit
(
df
,
y
)
n
=
n
processes
=
processes
return
self
.
transform
(
df
,
n
,
processes
)
def
transform_row_
(
self
,
row
,
t
):
ffm
=
[]
if
self
.
y
is
not
None
:
ffm
.
append
(
str
(
row
.
loc
[
row
.
index
==
self
.
y
][
0
]))
if
self
.
y
is
None
:
ffm
.
append
(
str
(
0
))
for
col
,
val
in
row
.
loc
[
row
.
index
!=
self
.
y
]
.
to_dict
()
.
items
():
col_type
=
t
[
col
]
name
=
'{}_{}'
.
format
(
col
,
val
)
if
col_type
.
kind
==
'O'
:
ffm
.
append
(
'{}:{}:1'
.
format
(
self
.
field_index_
[
col
],
self
.
feature_index_
[
name
]))
elif
col_type
.
kind
==
'i'
:
ffm
.
append
(
'{}:{}:{}'
.
format
(
self
.
field_index_
[
col
],
self
.
feature_index_
[
col
],
val
))
return
' '
.
join
(
ffm
)
def
transform
(
self
,
df
,
n
=
1500
,
processes
=
2
):
# n是每个线程运行最大的数据条数,processes是线程数
t
=
df
.
dtypes
.
to_dict
()
data_list
=
self
.
data_split_line
(
df
,
n
)
# 设置进程的数量
pool
=
Pool
(
processes
)
print
(
"总进度: "
+
str
(
len
(
data_list
)))
for
i
in
range
(
len
(
data_list
)):
data_list
[
i
]
=
pool
.
apply_async
(
self
.
pool_function
,
(
data_list
[
i
],
t
,))
result_map
=
{}
for
i
in
data_list
:
result_map
.
update
(
i
.
get
())
pool
.
close
()
pool
.
join
()
return
pd
.
Series
(
result_map
)
# 多进程计算方法
def
pool_function
(
self
,
df
,
t
):
return
{
idx
:
self
.
transform_row_
(
row
,
t
)
for
idx
,
row
in
df
.
iterrows
()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def
data_split_line
(
self
,
data
,
step
):
data_list
=
[]
x
=
0
while
True
:
if
x
+
step
<
data
.
__len__
():
data_list
.
append
(
data
.
iloc
[
x
:
x
+
step
])
x
=
x
+
step
+
1
else
:
data_list
.
append
(
data
.
iloc
[
x
:
data
.
__len__
()])
break
'''
# 返回生成器方法,但是本地测试效率不高
x = 0
while True:
if x + step < data.__len__():
yield data.iloc[x:x + step]
x = x + step + 1
else:
yield data.iloc[x:data.__len__()]
break
'''
return
data_list
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def
is_feature_index_exist
(
self
,
name
):
if
name
in
self
.
feature_index_
:
return
True
else
:
return
False
# ffm 格式转换函数、类
# class FFMFormatPandas:
# def __init__(self):
# self.field_index_ = None
# self.feature_index_ = None
# self.y = None
#
# def fit(self, df, y=None):
# self.y = y
# df_ffm = df[df.columns.difference([self.y])]
# if self.field_index_ is None:
# self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
#
# if self.feature_index_ is not None:
# last_idx = max(list(self.feature_index_.values()))
#
# if self.feature_index_ is None:
# self.feature_index_ = dict()
# last_idx = 0
#
# for col in df.columns:
# vals = df[col].unique()
# for val in vals:
# if pd.isnull(val):
# continue
# name = '{}_{}'.format(col, val)
# if name not in self.feature_index_:
# self.feature_index_[name] = last_idx
# last_idx += 1
# self.feature_index_[col] = last_idx
# last_idx += 1
# return self
#
# def fit_transform(self, df, y=None):
# self.fit(df, y)
# return self.transform(df)
#
# def transform_row_(self, row, t):
# ffm = []
# if self.y is not None:
# ffm.append(str(row.loc[row.index == self.y][0]))
# if self.y is None:
# ffm.append(str(0))
#
# for col, val in row.loc[row.index != self.y].to_dict().items():
# col_type = t[col]
# name = '{}_{}'.format(col, val)
# if col_type.kind == 'O':
# ffm.append('{}:{}:1'.format(self.field_index_[col], self.feature_index_[name]))
# elif col_type.kind == 'i':
# ffm.append('{}:{}:{}'.format(self.field_index_[col], self.feature_index_[col], val))
# return ' '.join(ffm)
#
# def transform(self, df):
# t = df.dtypes.to_dict()
# return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
#
# # 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
# def is_feature_index_exist(self, name):
# if name in self.feature_index_:
# return True
# else:
# return False
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment