Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
S
serviceRec
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
郭羽
serviceRec
Commits
ebe58143
Commit
ebe58143
authored
3 years ago
by
郭羽
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
service model 优化
parent
546e2013
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
243 additions
and
0 deletions
+243
-0
featureEng2.py
spark/featureEng2.py
+2
-0
train_service2.py
train/train_service2.py
+241
-0
No files found.
spark/featureEng2.py
View file @
ebe58143
...
@@ -855,5 +855,6 @@ if __name__ == '__main__':
...
@@ -855,5 +855,6 @@ if __name__ == '__main__':
train_df
=
samplesWithUserFeatures
.
toPandas
()
train_df
=
samplesWithUserFeatures
.
toPandas
()
train_df
=
pd
.
DataFrame
(
train_df
)
train_df
=
pd
.
DataFrame
(
train_df
)
train_df
.
to_csv
(
"/tmp/service_{}.csv"
.
format
(
endDay
))
train_df
.
to_csv
(
"/tmp/service_{}.csv"
.
format
(
endDay
))
print
(
"训练数据写入success"
)
spark
.
stop
()
spark
.
stop
()
\ No newline at end of file
This diff is collapsed.
Click to expand it.
train/train_service2.py
0 → 100644
View file @
ebe58143
import
tensorflow
as
tf
import
json
import
pandas
as
pd
import
time
import
sys
import
os
from
datetime
import
date
,
timedelta
sys
.
path
.
append
(
os
.
path
.
dirname
(
os
.
path
.
abspath
(
os
.
path
.
dirname
(
__file__
))))
import
utils.configUtils
as
configUtils
ITEM_NUMBER_COLUMNS
=
[
"item_"
+
c
for
c
in
[
"smart_rank2"
]]
embedding_columns
=
[
"itemid"
,
"userid"
]
+
[
"item_"
+
c
for
c
in
[
"doctor_id"
,
"hospital_id"
,
"merchant_id"
]]
multi_columns
=
[
"tags_v3"
,
"first_demands"
,
"second_demands"
,
"first_solutions"
,
"second_solutions"
,
"first_positions"
,
"second_positions"
]
one_hot_columns
=
[
"user_os"
]
+
[
"item_"
+
c
for
c
in
[
"service_type"
,
"doctor_type"
,
"doctor_famous"
,
"hospital_city_tag_id"
,
"hospital_type"
,
"hospital_is_high_quality"
]]
# history_columns = ["userRatedHistory"]
# 数据加载
# data_path_train = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv"
# data_path_test = "/Users/zhigangzheng/Desktop/work/guoyu/service_sort/train/part-00000-a61205d1-ad4e-4fa7-895d-ad8db41189e6-c000.csv"
VERSION
=
configUtils
.
SERVICE_VERSION
trainDay
=
time
.
strftime
(
"
%
Y
%
m
%
d
%
H"
,
time
.
localtime
())
data_path_train
=
"/data/files/service_feature_{}_train.csv"
.
format
(
VERSION
)
data_path_test
=
"/data/files/service_feature_{}_test.csv"
.
format
(
VERSION
)
model_file
=
configUtils
.
SERVICE_MODEL_PATH
+
"/"
+
trainDay
def
is_float
(
s
):
try
:
float
(
s
)
return
True
except
ValueError
:
return
False
# 数据类型转换
def
csvTypeConvert
(
columns
,
df
,
data_vocab
):
df
[
"label"
]
=
df
[
"label"
]
.
astype
(
"int"
)
for
k
in
columns
:
# 离散na值填充
if
data_vocab
.
get
(
k
):
df
[
k
]
=
df
[
k
]
.
fillna
(
"-1"
)
df
[
k
]
=
df
[
k
]
.
astype
(
"string"
)
elif
k
!=
"label"
:
# df[k] = df[k].map(lambda x:x if is_float(x) else 0)
df
[
k
]
=
df
[
k
]
.
fillna
(
0
)
df
[
k
]
=
df
[
k
]
.
astype
(
"float"
)
# print(df.dtypes)
return
df
def
loadData
(
data_path
):
print
(
"读取数据..."
)
timestmp1
=
int
(
round
(
time
.
time
()
*
1000
))
df
=
pd
.
read_csv
(
data_path
,
sep
=
"|"
)
timestmp2
=
int
(
round
(
time
.
time
()
*
1000
))
print
(
"读取数据耗时ms:{}"
.
format
(
timestmp2
-
timestmp1
))
return
df
def
getWeight
(
x
):
res
=
1
try
:
p
=
int
(
x
)
if
p
>
0
and
p
<=
5
:
res
=
2
elif
p
>
5
and
p
<=
10
:
res
=
3
elif
p
>
10
:
res
=
4
except
Exception
as
e
:
print
(
e
)
return
res
def
getDataSet
(
df
,
shuffleSize
=
10000
,
batchSize
=
128
):
# print(df.dtypes)
labels
=
df
.
pop
(
'label'
)
# df["rating"] = df["rating"].map(getWeight)
# weights = df.pop('rating')
dataSet
=
tf
.
data
.
Dataset
.
from_tensor_slices
((
dict
(
df
),
labels
))
.
shuffle
(
shuffleSize
)
.
batch
(
batchSize
)
return
dataSet
def
getTrainColumns
(
train_columns
,
data_vocab
):
emb_columns
=
[]
number_columns
=
[]
oneHot_columns
=
[]
dataColumns
=
[]
inputs
=
{}
# 离散特征
for
feature
in
train_columns
:
if
data_vocab
.
get
(
feature
):
if
feature
.
count
(
"__"
)
>
0
:
cat_col
=
tf
.
feature_column
.
categorical_column_with_vocabulary_list
(
key
=
feature
,
vocabulary_list
=
data_vocab
[
feature
])
col
=
tf
.
feature_column
.
embedding_column
(
cat_col
,
5
)
emb_columns
.
append
(
col
)
dataColumns
.
append
(
feature
)
inputs
[
feature
]
=
tf
.
keras
.
layers
.
Input
(
name
=
feature
,
shape
=
(),
dtype
=
'string'
)
elif
feature
in
one_hot_columns
or
feature
.
count
(
"Bucket"
)
>
0
:
cat_col
=
tf
.
feature_column
.
categorical_column_with_vocabulary_list
(
key
=
feature
,
vocabulary_list
=
data_vocab
[
feature
])
# col = tf.feature_column.indicator_column(cat_col)
col
=
tf
.
feature_column
.
embedding_column
(
cat_col
,
3
)
oneHot_columns
.
append
(
col
)
dataColumns
.
append
(
feature
)
inputs
[
feature
]
=
tf
.
keras
.
layers
.
Input
(
name
=
feature
,
shape
=
(),
dtype
=
'string'
)
else
:
cat_col
=
tf
.
feature_column
.
categorical_column_with_vocabulary_list
(
key
=
feature
,
vocabulary_list
=
data_vocab
[
feature
])
col
=
tf
.
feature_column
.
embedding_column
(
cat_col
,
10
)
emb_columns
.
append
(
col
)
dataColumns
.
append
(
feature
)
inputs
[
feature
]
=
tf
.
keras
.
layers
.
Input
(
name
=
feature
,
shape
=
(),
dtype
=
'string'
)
elif
feature
.
endswith
(
"_number"
):
col
=
tf
.
feature_column
.
numeric_column
(
feature
)
number_columns
.
append
(
col
)
dataColumns
.
append
(
feature
)
inputs
[
feature
]
=
tf
.
keras
.
layers
.
Input
(
name
=
feature
,
shape
=
(),
dtype
=
'float32'
)
return
emb_columns
,
number_columns
,
oneHot_columns
,
dataColumns
,
inputs
def
train
(
emb_columns
,
number_columns
,
oneHot_columns
,
inputs
,
train_dataset
):
wide
=
tf
.
keras
.
layers
.
DenseFeatures
(
emb_columns
+
number_columns
+
oneHot_columns
)(
inputs
)
deep
=
tf
.
keras
.
layers
.
Dense
(
64
,
activation
=
'relu'
)(
wide
)
deep
=
tf
.
keras
.
layers
.
Dropout
(
0.2
)(
deep
)
concat_layer
=
tf
.
keras
.
layers
.
concatenate
([
wide
,
deep
],
axis
=
1
)
# deep = tf.keras.layers.Dense(64, activation='relu')(deep)
# deep = tf.keras.layers.Dropout(0.5)(deep)
output_layer
=
tf
.
keras
.
layers
.
Dense
(
1
,
activation
=
'sigmoid'
)(
concat_layer
)
# output_layer = FM(1)(deep)
model
=
tf
.
keras
.
Model
(
inputs
,
output_layer
)
# compile the model, set loss function, optimizer and evaluation metrics
model
.
compile
(
loss
=
'binary_crossentropy'
,
optimizer
=
'adam'
,
metrics
=
[
'accuracy'
,
tf
.
keras
.
metrics
.
AUC
(
curve
=
'ROC'
),
tf
.
keras
.
metrics
.
AUC
(
curve
=
'PR'
)])
# train the model
print
(
"train start..."
)
model
.
fit
(
train_dataset
,
epochs
=
5
)
print
(
"train end..."
)
print
(
"train save..."
)
model
.
save
(
model_file
,
include_optimizer
=
False
,
save_format
=
'tf'
)
return
model
def
evaluate
(
model
,
test_dataset
):
if
not
model
:
print
(
"加载模型中"
)
model
=
tf
.
keras
.
models
.
load_model
(
model_file
)
# evaluate the model
timestmp1
=
int
(
round
(
time
.
time
()))
print
(
"evaluate:"
)
test_loss
,
test_accuracy
,
test_roc_auc
,
test_pr_auc
=
model
.
evaluate
(
test_dataset
)
print
(
'
\n\n
Test Loss {}, Test Accuracy {}, Test ROC AUC {}, Test PR AUC {}'
.
format
(
test_loss
,
test_accuracy
,
test_roc_auc
,
test_pr_auc
))
print
(
"验证耗时s:"
,
int
(
round
(
time
.
time
()))
-
timestmp1
)
def
predict
(
model_path
,
df
):
print
(
"加载模型中..."
)
model_new
=
tf
.
keras
.
models
.
load_model
(
model_path
)
# model_new.summary()
print
(
"模型加载完成..."
)
# model = tf.keras.models.model_from_json(model.to_json)
n
=
1000
dd
=
dict
(
df
.
sample
(
n
=
n
))
for
i
in
range
(
10
):
timestmp1
=
int
(
round
(
time
.
time
()
*
1000
))
model_new
.
predict
(
dd
,
batch_size
=
10000
)
print
(
"测试样本数:{},测试耗时ms:{}"
.
format
(
n
,
int
(
round
(
time
.
time
()
*
1000
))
-
timestmp1
))
def
addDays
(
n
,
format
=
"
%
Y
%
m
%
d"
):
return
(
date
.
today
()
+
timedelta
(
days
=
n
))
.
strftime
(
format
)
if
__name__
==
'__main__'
:
curTime
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
time
.
localtime
())
print
(
"train_service执行时间:{}"
.
format
(
curTime
))
splitTimestamp
=
int
(
time
.
mktime
(
time
.
strptime
(
addDays
(
-
1
),
"
%
Y
%
m
%
d"
)))
# redis中加载数据字典
print
(
"加载模型字典..."
)
data_vocab
=
json
.
load
(
open
(
configUtils
.
VOCAB_PATH
,
mode
=
'r'
,
encoding
=
'utf-8'
))
print
(
"字典keys:"
,
str
(
data_vocab
.
keys
()))
# data_vocab = getDataVocabFromRedis(VERSION)
assert
data_vocab
timestmp1
=
int
(
round
(
time
.
time
()))
df_train
=
loadData
(
data_path_train
)
print
(
df_train
.
dtypes
)
df_test
=
df_train
.
loc
[
df_train
[
'timestamp'
]
>=
splitTimestamp
]
df_train
=
df_train
.
loc
[
df_train
[
'timestamp'
]
<
splitTimestamp
]
# df_test = loadData(data_path_test)
timestmp2
=
int
(
round
(
time
.
time
()))
print
(
"读取数据耗时s:{}"
.
format
(
timestmp2
-
timestmp1
))
# 获取训练列
columns
=
df_train
.
columns
.
tolist
()
print
(
"原始数据列:"
)
print
(
columns
)
emb_columns
,
number_columns
,
oneHot_columns
,
datasColumns
,
inputs
=
getTrainColumns
(
columns
,
data_vocab
)
print
(
"训练列:"
)
print
(
datasColumns
)
df_train
=
df_train
[
datasColumns
+
[
"label"
]]
df_test
=
df_test
[
datasColumns
+
[
"label"
]]
trainSize
=
df_train
[
"label"
]
.
count
()
print
(
"trainSize:{}"
.
format
(
trainSize
))
testSize
=
df_test
[
"label"
]
.
count
()
print
(
"trainSize:{},testSize{}"
.
format
(
trainSize
,
testSize
))
# 数据类型转换
df_train
=
csvTypeConvert
(
datasColumns
,
df_train
,
data_vocab
)
df_test
=
csvTypeConvert
(
datasColumns
,
df_test
,
data_vocab
)
# 获取训练数据
train_data
=
getDataSet
(
df_train
,
shuffleSize
=
trainSize
,)
print
(
"train start..."
)
timestmp3
=
int
(
round
(
time
.
time
()))
model
=
train
(
emb_columns
,
number_columns
,
oneHot_columns
,
inputs
,
train_data
)
timestmp4
=
int
(
round
(
time
.
time
()))
print
(
"train end...耗时h:{}"
.
format
((
timestmp4
-
timestmp3
)
/
60
/
60
))
if
(
testSize
>
0
):
test_data
=
getDataSet
(
df_test
,
shuffleSize
=
testSize
)
evaluate
(
model
,
test_data
)
# predict(model_file,test_data)
pass
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment