Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
a3b3de0c
Commit
a3b3de0c
authored
May 21, 2019
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
修改测试文件
parent
ff8ced2c
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
384 additions
and
4 deletions
+384
-4
multi.py
tensnsorflow/multi.py
+4
-4
train_multi.py
tensnsorflow/train_multi.py
+380
-0
No files found.
tensnsorflow/multi.py
View file @
a3b3de0c
...
...
@@ -165,8 +165,8 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.
toDF
(
"app_list"
,
"level2_ids"
,
"level3_ids"
,
"y"
,
"z"
,
"ucity_id"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"time"
,
"hospital_id"
,
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
,
"top"
,
"stat_date"
)
.
write
.
format
(
"tfrecords"
)
.
option
(
"recordType"
,
"Example"
)
\
.
repartition
(
1
)
.
save
(
path
=
path
+
"native/"
,
mode
=
"overwrite"
)
"recover_time"
,
"top"
,
"stat_date"
)
.
repartition
(
1
)
.
write
.
format
(
"tfrecords"
)
.
option
(
"recordType"
,
"Example"
)
\
.
save
(
path
=
path
+
"native/"
,
mode
=
"overwrite"
)
nearby_pre
=
spark
.
createDataFrame
(
rdd
.
filter
(
lambda
x
:
x
[
6
]
==
1
)
.
map
(
lambda
x
:
(
x
[
3
],
x
[
4
],
x
[
5
])))
\
.
toDF
(
"city"
,
"uid"
,
"cid_id"
)
...
...
@@ -181,8 +181,8 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
.
toDF
(
"app_list"
,
"level2_ids"
,
"level3_ids"
,
"y"
,
"z"
,
"ucity_id"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"time"
,
"hospital_id"
,
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
,
"top"
,
"stat_date"
)
.
write
.
format
(
"tfrecords"
)
.
option
(
"recordType"
,
"Example"
)
\
.
repartition
(
1
)
.
save
(
path
=
path
+
"nearby/"
,
mode
=
"overwrite"
)
"recover_time"
,
"top"
,
"stat_date"
)
.
repartition
(
1
)
.
write
.
format
(
"tfrecords"
)
.
option
(
"recordType"
,
"Example"
)
\
.
save
(
path
=
path
+
"nearby/"
,
mode
=
"overwrite"
)
rdd
.
unpersist
()
...
...
tensnsorflow/train_multi.py
0 → 100644
View file @
a3b3de0c
#coding=utf-8
#from __future__ import absolute_import
#from __future__ import division
#from __future__ import print_function
#import argparse
import
shutil
import
os
import
json
import
glob
from
datetime
import
date
,
timedelta
import
random
import
tensorflow
as
tf
#################### CMD Arguments ####################
FLAGS
=
tf
.
app
.
flags
.
FLAGS
tf
.
app
.
flags
.
DEFINE_integer
(
"dist_mode"
,
0
,
"distribuion mode {0-loacal, 1-single_dist, 2-multi_dist}"
)
tf
.
app
.
flags
.
DEFINE_string
(
"ps_hosts"
,
''
,
"Comma-separated list of hostname:port pairs"
)
tf
.
app
.
flags
.
DEFINE_string
(
"worker_hosts"
,
''
,
"Comma-separated list of hostname:port pairs"
)
tf
.
app
.
flags
.
DEFINE_string
(
"job_name"
,
''
,
"One of 'ps', 'worker'"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"task_index"
,
0
,
"Index of task within the job"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"num_threads"
,
16
,
"Number of threads"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"feature_size"
,
0
,
"Number of features"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"field_size"
,
0
,
"Number of common fields"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"embedding_size"
,
32
,
"Embedding size"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"num_epochs"
,
10
,
"Number of epochs"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"batch_size"
,
64
,
"Number of batch size"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"log_steps"
,
1000
,
"save summary every steps"
)
tf
.
app
.
flags
.
DEFINE_float
(
"learning_rate"
,
0.0005
,
"learning rate"
)
tf
.
app
.
flags
.
DEFINE_float
(
"l2_reg"
,
0.0001
,
"L2 regularization"
)
tf
.
app
.
flags
.
DEFINE_string
(
"loss_type"
,
'log_loss'
,
"loss type {square_loss, log_loss}"
)
tf
.
app
.
flags
.
DEFINE_float
(
"ctr_task_wgt"
,
0.5
,
"loss weight of ctr task"
)
tf
.
app
.
flags
.
DEFINE_string
(
"optimizer"
,
'Adam'
,
"optimizer type {Adam, Adagrad, GD, Momentum}"
)
tf
.
app
.
flags
.
DEFINE_string
(
"deep_layers"
,
'256,128,64'
,
"deep layers"
)
tf
.
app
.
flags
.
DEFINE_string
(
"dropout"
,
'0.5,0.5,0.5'
,
"dropout rate"
)
tf
.
app
.
flags
.
DEFINE_boolean
(
"batch_norm"
,
False
,
"perform batch normaization (True or False)"
)
tf
.
app
.
flags
.
DEFINE_float
(
"batch_norm_decay"
,
0.9
,
"decay for the moving average(recommend trying decay=0.9)"
)
tf
.
app
.
flags
.
DEFINE_string
(
"data_dir"
,
''
,
"data dir"
)
tf
.
app
.
flags
.
DEFINE_string
(
"dt_dir"
,
''
,
"data dt partition"
)
tf
.
app
.
flags
.
DEFINE_string
(
"model_dir"
,
''
,
"model check point dir"
)
tf
.
app
.
flags
.
DEFINE_string
(
"servable_model_dir"
,
''
,
"export servable model for TensorFlow Serving"
)
tf
.
app
.
flags
.
DEFINE_string
(
"task_type"
,
'train'
,
"task type {train, infer, eval, export}"
)
tf
.
app
.
flags
.
DEFINE_boolean
(
"clear_existing_model"
,
False
,
"clear existing model or not"
)
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def
input_fn
(
filenames
,
batch_size
=
32
,
num_epochs
=
1
,
perform_shuffle
=
False
):
print
(
'Parsing'
,
filenames
)
def
_parse_fn
(
record
):
features
=
{
"y"
:
tf
.
FixedLenFeature
([],
tf
.
float32
),
"z"
:
tf
.
FixedLenFeature
([],
tf
.
float32
),
"ids"
:
tf
.
FixedLenFeature
([
FLAGS
.
field_size
],
tf
.
int64
),
"app_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"level2_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"level3_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag1_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag2_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag3_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag4_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag5_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag6_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag7_list"
:
tf
.
VarLenFeature
(
tf
.
int64
)
}
parsed
=
tf
.
parse_single_example
(
record
,
features
)
y
=
parsed
.
pop
(
'y'
)
z
=
parsed
.
pop
(
'z'
)
return
parsed
,
{
"y"
:
y
,
"z"
:
z
}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
dataset
=
tf
.
data
.
TFRecordDataset
(
filenames
)
.
map
(
_parse_fn
,
num_parallel_calls
=
10
)
.
prefetch
(
500000
)
# multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
if
perform_shuffle
:
dataset
=
dataset
.
shuffle
(
buffer_size
=
256
)
# epochs from blending together.
dataset
=
dataset
.
repeat
(
num_epochs
)
dataset
=
dataset
.
batch
(
batch_size
)
# Batch size to use
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator
=
dataset
.
make_one_shot_iterator
()
batch_features
,
batch_labels
=
iterator
.
get_next
()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return
batch_features
,
batch_labels
def
model_fn
(
features
,
labels
,
mode
,
params
):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
field_size
=
params
[
"field_size"
]
feature_size
=
params
[
"feature_size"
]
embedding_size
=
params
[
"embedding_size"
]
l2_reg
=
params
[
"l2_reg"
]
learning_rate
=
params
[
"learning_rate"
]
#optimizer = params["optimizer"]
layers
=
list
(
map
(
int
,
params
[
"deep_layers"
]
.
split
(
','
)))
dropout
=
list
(
map
(
float
,
params
[
"dropout"
]
.
split
(
','
)))
ctr_task_wgt
=
params
[
"ctr_task_wgt"
]
common_dims
=
field_size
*
embedding_size
#------bulid weights------
Feat_Emb
=
tf
.
get_variable
(
name
=
'embeddings'
,
shape
=
[
feature_size
,
embedding_size
],
initializer
=
tf
.
glorot_normal_initializer
())
feat_ids
=
features
[
'ids'
]
app_list
=
features
[
'app_list'
]
level2_list
=
features
[
'level2_list'
]
level3_list
=
features
[
'level3_list'
]
tag1_list
=
features
[
'tag1_list'
]
tag2_list
=
features
[
'tag2_list'
]
tag3_list
=
features
[
'tag3_list'
]
tag4_list
=
features
[
'tag4_list'
]
tag5_list
=
features
[
'tag5_list'
]
tag6_list
=
features
[
'tag6_list'
]
tag7_list
=
features
[
'tag7_list'
]
if
FLAGS
.
task_type
!=
"infer"
:
y
=
labels
[
'y'
]
z
=
labels
[
'z'
]
#------build f(x)------
with
tf
.
variable_scope
(
"Shared-Embedding-layer"
):
embedding_id
=
tf
.
nn
.
embedding_lookup
(
Feat_Emb
,
feat_ids
)
app_id
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
app_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
level2
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
level2_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
level3
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
level3_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag1
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag1_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag2
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag2_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag3
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag3_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag4
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag4_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag5
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag5_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag6
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag6_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag7
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag7_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat
=
tf
.
concat
([
tf
.
reshape
(
embedding_id
,
shape
=
[
-
1
,
common_dims
]),
app_id
,
level2
,
level3
,
tag1
,
tag2
,
tag3
,
tag4
,
tag5
,
tag6
,
tag7
],
axis
=
1
)
with
tf
.
name_scope
(
"CVR_Task"
):
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
train_phase
=
True
else
:
train_phase
=
False
x_cvr
=
x_concat
for
i
in
range
(
len
(
layers
)):
x_cvr
=
tf
.
contrib
.
layers
.
fully_connected
(
inputs
=
x_cvr
,
num_outputs
=
layers
[
i
],
\
weights_regularizer
=
tf
.
contrib
.
layers
.
l2_regularizer
(
l2_reg
),
scope
=
'cvr_mlp
%
d'
%
i
)
if
FLAGS
.
batch_norm
:
x_cvr
=
batch_norm_layer
(
x_cvr
,
train_phase
=
train_phase
,
scope_bn
=
'cvr_bn_
%
d'
%
i
)
#放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
x_cvr
=
tf
.
nn
.
dropout
(
x_cvr
,
keep_prob
=
dropout
[
i
])
#Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_cvr
=
tf
.
contrib
.
layers
.
fully_connected
(
inputs
=
x_cvr
,
num_outputs
=
1
,
activation_fn
=
tf
.
identity
,
\
weights_regularizer
=
tf
.
contrib
.
layers
.
l2_regularizer
(
l2_reg
),
scope
=
'cvr_out'
)
y_cvr
=
tf
.
reshape
(
y_cvr
,
shape
=
[
-
1
])
with
tf
.
name_scope
(
"CTR_Task"
):
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
train_phase
=
True
else
:
train_phase
=
False
x_ctr
=
x_concat
for
i
in
range
(
len
(
layers
)):
x_ctr
=
tf
.
contrib
.
layers
.
fully_connected
(
inputs
=
x_ctr
,
num_outputs
=
layers
[
i
],
\
weights_regularizer
=
tf
.
contrib
.
layers
.
l2_regularizer
(
l2_reg
),
scope
=
'ctr_mlp
%
d'
%
i
)
if
FLAGS
.
batch_norm
:
x_ctr
=
batch_norm_layer
(
x_ctr
,
train_phase
=
train_phase
,
scope_bn
=
'ctr_bn_
%
d'
%
i
)
#放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
x_ctr
=
tf
.
nn
.
dropout
(
x_ctr
,
keep_prob
=
dropout
[
i
])
#Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_ctr
=
tf
.
contrib
.
layers
.
fully_connected
(
inputs
=
x_ctr
,
num_outputs
=
1
,
activation_fn
=
tf
.
identity
,
\
weights_regularizer
=
tf
.
contrib
.
layers
.
l2_regularizer
(
l2_reg
),
scope
=
'ctr_out'
)
y_ctr
=
tf
.
reshape
(
y_ctr
,
shape
=
[
-
1
])
with
tf
.
variable_scope
(
"MTL-Layer"
):
pctr
=
tf
.
sigmoid
(
y_ctr
)
pcvr
=
tf
.
sigmoid
(
y_cvr
)
pctcvr
=
pctr
*
pcvr
predictions
=
{
"pcvr"
:
pcvr
,
"pctr"
:
pctr
,
"pctcvr"
:
pctcvr
}
export_outputs
=
{
tf
.
saved_model
.
signature_constants
.
DEFAULT_SERVING_SIGNATURE_DEF_KEY
:
tf
.
estimator
.
export
.
PredictOutput
(
predictions
)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if
mode
==
tf
.
estimator
.
ModeKeys
.
PREDICT
:
return
tf
.
estimator
.
EstimatorSpec
(
mode
=
mode
,
predictions
=
predictions
,
export_outputs
=
export_outputs
)
if
FLAGS
.
task_type
!=
"infer"
:
#------bulid loss------
ctr_loss
=
tf
.
reduce_mean
(
tf
.
nn
.
sigmoid_cross_entropy_with_logits
(
logits
=
y_ctr
,
labels
=
y
))
#cvr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctcvr, labels=z))
cvr_loss
=
tf
.
reduce_mean
(
tf
.
losses
.
log_loss
(
predictions
=
pctcvr
,
labels
=
z
))
loss
=
ctr_task_wgt
*
ctr_loss
+
(
1
-
ctr_task_wgt
)
*
cvr_loss
+
l2_reg
*
tf
.
nn
.
l2_loss
(
Feat_Emb
)
tf
.
summary
.
scalar
(
'ctr_loss'
,
ctr_loss
)
tf
.
summary
.
scalar
(
'cvr_loss'
,
cvr_loss
)
# Provide an estimator spec for `ModeKeys.EVAL`
eval_metric_ops
=
{
"CTR_AUC"
:
tf
.
metrics
.
auc
(
y
,
pctr
),
#"CTR_F1": tf.contrib.metrics.f1_score(y,pctr),
#"CTR_Precision": tf.metrics.precision(y,pctr),
#"CTR_Recall": tf.metrics.recall(y,pctr),
"CVR_AUC"
:
tf
.
metrics
.
auc
(
z
,
pcvr
),
"CTCVR_AUC"
:
tf
.
metrics
.
auc
(
z
,
pctcvr
)
}
if
mode
==
tf
.
estimator
.
ModeKeys
.
EVAL
:
return
tf
.
estimator
.
EstimatorSpec
(
mode
=
mode
,
predictions
=
predictions
,
loss
=
loss
,
eval_metric_ops
=
eval_metric_ops
)
#------bulid optimizer------
if
FLAGS
.
optimizer
==
'Adam'
:
optimizer
=
tf
.
train
.
AdamOptimizer
(
learning_rate
=
learning_rate
,
beta1
=
0.9
,
beta2
=
0.999
,
epsilon
=
1e-8
)
elif
FLAGS
.
optimizer
==
'Adagrad'
:
optimizer
=
tf
.
train
.
AdagradOptimizer
(
learning_rate
=
learning_rate
,
initial_accumulator_value
=
1e-8
)
elif
FLAGS
.
optimizer
==
'Momentum'
:
optimizer
=
tf
.
train
.
MomentumOptimizer
(
learning_rate
=
learning_rate
,
momentum
=
0.95
)
elif
FLAGS
.
optimizer
==
'ftrl'
:
optimizer
=
tf
.
train
.
FtrlOptimizer
(
learning_rate
)
train_op
=
optimizer
.
minimize
(
loss
,
global_step
=
tf
.
train
.
get_global_step
())
# Provide an estimator spec for `ModeKeys.TRAIN` modes
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
return
tf
.
estimator
.
EstimatorSpec
(
mode
=
mode
,
predictions
=
predictions
,
loss
=
loss
,
train_op
=
train_op
)
def
batch_norm_layer
(
x
,
train_phase
,
scope_bn
):
bn_train
=
tf
.
contrib
.
layers
.
batch_norm
(
x
,
decay
=
FLAGS
.
batch_norm_decay
,
center
=
True
,
scale
=
True
,
updates_collections
=
None
,
is_training
=
True
,
reuse
=
None
,
scope
=
scope_bn
)
bn_infer
=
tf
.
contrib
.
layers
.
batch_norm
(
x
,
decay
=
FLAGS
.
batch_norm_decay
,
center
=
True
,
scale
=
True
,
updates_collections
=
None
,
is_training
=
False
,
reuse
=
True
,
scope
=
scope_bn
)
z
=
tf
.
cond
(
tf
.
cast
(
train_phase
,
tf
.
bool
),
lambda
:
bn_train
,
lambda
:
bn_infer
)
return
z
def
set_dist_env
():
if
FLAGS
.
dist_mode
==
1
:
# 本地分布式测试模式1 chief, 1 ps, 1 evaluator
ps_hosts
=
FLAGS
.
ps_hosts
.
split
(
','
)
chief_hosts
=
FLAGS
.
chief_hosts
.
split
(
','
)
task_index
=
FLAGS
.
task_index
job_name
=
FLAGS
.
job_name
print
(
'ps_host'
,
ps_hosts
)
print
(
'chief_hosts'
,
chief_hosts
)
print
(
'job_name'
,
job_name
)
print
(
'task_index'
,
str
(
task_index
))
# 无worker参数
tf_config
=
{
'cluster'
:
{
'chief'
:
chief_hosts
,
'ps'
:
ps_hosts
},
'task'
:
{
'type'
:
job_name
,
'index'
:
task_index
}
}
print
(
json
.
dumps
(
tf_config
))
os
.
environ
[
'TF_CONFIG'
]
=
json
.
dumps
(
tf_config
)
elif
FLAGS
.
dist_mode
==
2
:
# 集群分布式模式
ps_hosts
=
FLAGS
.
ps_hosts
.
split
(
','
)
worker_hosts
=
FLAGS
.
worker_hosts
.
split
(
','
)
chief_hosts
=
worker_hosts
[
0
:
1
]
# get first worker as chief
worker_hosts
=
worker_hosts
[
2
:]
# the rest as worker
task_index
=
FLAGS
.
task_index
job_name
=
FLAGS
.
job_name
print
(
'ps_host'
,
ps_hosts
)
print
(
'worker_host'
,
worker_hosts
)
print
(
'chief_hosts'
,
chief_hosts
)
print
(
'job_name'
,
job_name
)
print
(
'task_index'
,
str
(
task_index
))
# use #worker=0 as chief
if
job_name
==
"worker"
and
task_index
==
0
:
job_name
=
"chief"
# use #worker=1 as evaluator
if
job_name
==
"worker"
and
task_index
==
1
:
job_name
=
'evaluator'
task_index
=
0
# the others as worker
if
job_name
==
"worker"
and
task_index
>
1
:
task_index
-=
2
tf_config
=
{
'cluster'
:
{
'chief'
:
chief_hosts
,
'worker'
:
worker_hosts
,
'ps'
:
ps_hosts
},
'task'
:
{
'type'
:
job_name
,
'index'
:
task_index
}
}
print
(
json
.
dumps
(
tf_config
))
os
.
environ
[
'TF_CONFIG'
]
=
json
.
dumps
(
tf_config
)
def
main
(
_
):
#------check Arguments------
if
FLAGS
.
dt_dir
==
""
:
FLAGS
.
dt_dir
=
(
date
.
today
()
+
timedelta
(
-
1
))
.
strftime
(
'
%
Y
%
m
%
d'
)
FLAGS
.
model_dir
=
FLAGS
.
model_dir
+
FLAGS
.
dt_dir
#FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir
print
(
'task_type '
,
FLAGS
.
task_type
)
print
(
'model_dir '
,
FLAGS
.
model_dir
)
print
(
'data_dir '
,
FLAGS
.
data_dir
)
print
(
'dt_dir '
,
FLAGS
.
dt_dir
)
print
(
'num_epochs '
,
FLAGS
.
num_epochs
)
print
(
'feature_size '
,
FLAGS
.
feature_size
)
print
(
'field_size '
,
FLAGS
.
field_size
)
print
(
'embedding_size '
,
FLAGS
.
embedding_size
)
print
(
'batch_size '
,
FLAGS
.
batch_size
)
print
(
'deep_layers '
,
FLAGS
.
deep_layers
)
print
(
'dropout '
,
FLAGS
.
dropout
)
print
(
'loss_type '
,
FLAGS
.
loss_type
)
print
(
'optimizer '
,
FLAGS
.
optimizer
)
print
(
'learning_rate '
,
FLAGS
.
learning_rate
)
print
(
'l2_reg '
,
FLAGS
.
l2_reg
)
print
(
'ctr_task_wgt '
,
FLAGS
.
ctr_task_wgt
)
#------init Envs------
path
=
"hdfs:///strategy/esmm/"
tr_files
=
[
path
+
"tr/part-r-00000"
]
va_files
=
[
path
+
"va/part-r-00000"
]
# tr_files = glob.glob("%s/tr/*tfrecord" % FLAGS.data_dir)
# random.shuffle(tr_files)
# print("tr_files:", tr_files)
# va_files = glob.glob("%s/va/*tfrecord" % FLAGS.data_dir)
# print("va_files:", va_files)
# te_files = glob.glob("%s/*tfrecord" % FLAGS.data_dir)
# print("te_files:", te_files)
if
FLAGS
.
clear_existing_model
:
try
:
shutil
.
rmtree
(
FLAGS
.
model_dir
)
except
Exception
as
e
:
print
(
e
,
"at clear_existing_model"
)
else
:
print
(
"existing model cleaned at
%
s"
%
FLAGS
.
model_dir
)
set_dist_env
()
#------bulid Tasks------
model_params
=
{
"field_size"
:
FLAGS
.
field_size
,
"feature_size"
:
FLAGS
.
feature_size
,
"embedding_size"
:
FLAGS
.
embedding_size
,
"learning_rate"
:
FLAGS
.
learning_rate
,
"l2_reg"
:
FLAGS
.
l2_reg
,
"deep_layers"
:
FLAGS
.
deep_layers
,
"dropout"
:
FLAGS
.
dropout
,
"ctr_task_wgt"
:
FLAGS
.
ctr_task_wgt
}
config
=
tf
.
estimator
.
RunConfig
()
.
replace
(
session_config
=
tf
.
ConfigProto
(
device_count
=
{
'GPU'
:
0
,
'CPU'
:
FLAGS
.
num_threads
}),
log_step_count_steps
=
FLAGS
.
log_steps
,
save_summary_steps
=
FLAGS
.
log_steps
)
Estimator
=
tf
.
estimator
.
Estimator
(
model_fn
=
model_fn
,
model_dir
=
FLAGS
.
model_dir
,
params
=
model_params
,
config
=
config
)
if
FLAGS
.
task_type
==
'train'
:
train_spec
=
tf
.
estimator
.
TrainSpec
(
input_fn
=
lambda
:
input_fn
(
tr_files
,
num_epochs
=
FLAGS
.
num_epochs
,
batch_size
=
FLAGS
.
batch_size
))
eval_spec
=
tf
.
estimator
.
EvalSpec
(
input_fn
=
lambda
:
input_fn
(
va_files
,
num_epochs
=
1
,
batch_size
=
FLAGS
.
batch_size
),
steps
=
None
,
start_delay_secs
=
1000
,
throttle_secs
=
1200
)
result
=
tf
.
estimator
.
train_and_evaluate
(
Estimator
,
train_spec
,
eval_spec
)
for
key
,
value
in
sorted
(
result
[
0
]
.
items
()):
print
(
'
%
s:
%
s'
%
(
key
,
value
))
elif
FLAGS
.
task_type
==
'eval'
:
result
=
Estimator
.
evaluate
(
input_fn
=
lambda
:
input_fn
(
va_files
,
num_epochs
=
1
,
batch_size
=
FLAGS
.
batch_size
))
for
key
,
value
in
sorted
(
result
.
items
()):
print
(
'
%
s:
%
s'
%
(
key
,
value
))
elif
FLAGS
.
task_type
==
'infer'
:
preds
=
Estimator
.
predict
(
input_fn
=
lambda
:
input_fn
(
te_files
,
num_epochs
=
1
,
batch_size
=
FLAGS
.
batch_size
),
predict_keys
=
[
"pctcvr"
,
"pctr"
,
"pcvr"
])
with
open
(
FLAGS
.
data_dir
+
"/pred.txt"
,
"w"
)
as
fo
:
print
(
"-"
*
100
)
with
open
(
FLAGS
.
data_dir
+
"/pred.txt"
,
"w"
)
as
fo
:
for
prob
in
preds
:
fo
.
write
(
"
%
f
\t
%
f
\t
%
f
\n
"
%
(
prob
[
'pctr'
],
prob
[
'pcvr'
],
prob
[
'pctcvr'
]))
elif
FLAGS
.
task_type
==
'export'
:
print
(
"Not Implemented, Do It Yourself!"
)
if
__name__
==
"__main__"
:
tf
.
logging
.
set_verbosity
(
tf
.
logging
.
INFO
)
tf
.
app
.
run
()
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment