Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
4fa0a50f
Commit
4fa0a50f
authored
Jun 24, 2019
by
Your Name
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
test
parent
ced9f2c4
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
47 additions
and
36 deletions
+47
-36
predict.py
eda/esmm/Model_pipline/predict.py
+47
-36
No files found.
eda/esmm/Model_pipline/
dist_sor
t.py
→
eda/esmm/Model_pipline/
predic
t.py
View file @
4fa0a50f
import
tensorflow
as
tf
import
pymysql
from
pyspark.conf
import
SparkConf
import
pytispark.pytispark
as
pti
from
pyspark.sql
import
SparkSession
import
datetime
import
pandas
as
pd
from
datetime
import
date
,
timedelta
import
time
from
pyspark
import
StorageLevel
from
pyspark.sql
import
Row
import
os
import
sys
from
sqlalchemy
import
create_engine
...
...
@@ -30,7 +25,8 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
"tag5_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag6_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"tag7_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"number"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"search_tag2_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"search_tag3_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"uid"
:
tf
.
VarLenFeature
(
tf
.
string
),
"city"
:
tf
.
VarLenFeature
(
tf
.
string
),
"cid_id"
:
tf
.
VarLenFeature
(
tf
.
string
)
...
...
@@ -88,7 +84,8 @@ def model_fn(features, labels, mode, params):
tag5_list
=
features
[
'tag5_list'
]
tag6_list
=
features
[
'tag6_list'
]
tag7_list
=
features
[
'tag7_list'
]
number
=
features
[
'number'
]
search_tag2_list
=
features
[
'search_tag2_list'
]
search_tag3_list
=
features
[
'search_tag3_list'
]
uid
=
features
[
'uid'
]
city
=
features
[
'city'
]
cid_id
=
features
[
'cid_id'
]
...
...
@@ -107,12 +104,14 @@ def model_fn(features, labels, mode, params):
tag5
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag5_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag6
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag6_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
tag7
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
tag7_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
search_tag2
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
search_tag2_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
search_tag3
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
search_tag3_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat
=
tf
.
concat
([
tf
.
reshape
(
embedding_id
,
shape
=
[
-
1
,
common_dims
]),
app_id
,
level2
,
level3
,
tag1
,
tag2
,
tag3
,
tag4
,
tag5
,
tag6
,
tag7
],
axis
=
1
)
tag2
,
tag3
,
tag4
,
tag5
,
tag6
,
tag7
,
search_tag2
,
search_tag3
],
axis
=
1
)
sample_id
=
tf
.
sparse
.
to_dense
(
number
)
uid
=
tf
.
sparse
.
to_dense
(
uid
,
default_value
=
""
)
city
=
tf
.
sparse
.
to_dense
(
city
,
default_value
=
""
)
cid_id
=
tf
.
sparse
.
to_dense
(
cid_id
,
default_value
=
""
)
...
...
@@ -149,7 +148,7 @@ def model_fn(features, labels, mode, params):
pcvr
=
tf
.
sigmoid
(
y_cvr
)
pctcvr
=
pctr
*
pcvr
predictions
=
{
"pcvr"
:
pcvr
,
"pctr"
:
pctr
,
"pctcvr"
:
pctcvr
,
"sample_id"
:
sample_id
,
"uid"
:
uid
,
"city"
:
city
,
"cid_id"
:
cid_id
}
predictions
=
{
"pctcvr"
:
pctcvr
,
"uid"
:
uid
,
"city"
:
city
,
"cid_id"
:
cid_id
}
export_outputs
=
{
tf
.
saved_model
.
signature_constants
.
DEFAULT_SERVING_SIGNATURE_DEF_KEY
:
tf
.
estimator
.
export
.
PredictOutput
(
predictions
)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if
mode
==
tf
.
estimator
.
ModeKeys
.
PREDICT
:
...
...
@@ -176,25 +175,11 @@ def main(te_file):
log_step_count_steps
=
100
,
save_summary_steps
=
100
)
Estimator
=
tf
.
estimator
.
Estimator
(
model_fn
=
model_fn
,
model_dir
=
model_dir
,
params
=
model_params
,
config
=
config
)
preds
=
Estimator
.
predict
(
input_fn
=
lambda
:
input_fn
(
te_file
,
num_epochs
=
1
,
batch_size
=
10000
),
predict_keys
=
[
"pctcvr"
,
"pctr"
,
"pcvr"
,
"sample_id"
,
"uid"
,
"city"
,
"cid_id"
])
# with open("/home/gmuser/esmm/nearby/pred.txt", "w") as fo:
# for prob in preds:
# fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
preds
=
Estimator
.
predict
(
input_fn
=
lambda
:
input_fn
(
te_file
,
num_epochs
=
1
,
batch_size
=
10000
),
predict_keys
=
[
"pctcvr"
,
"uid"
,
"city"
,
"cid_id"
])
# ctcvr = []
str_result
=
""
result
=
[]
for
prob
in
preds
:
# ctcvr.append((prob["sample_id"][0],prob['pctcvr']))
str_result
=
str_result
+
str
(
prob
[
"sample_id"
][
0
])
+
":"
+
str
(
prob
[
"uid"
][
0
])
+
":"
+
str
(
prob
[
"city"
][
0
])
+
":"
+
str
(
prob
[
"cid_id"
][
0
])
+
":"
+
str
(
prob
[
'pctcvr'
])
+
";"
# str_result = list(prob.keys())
# return str_result
return
str_result
[:
-
1
]
# indices = []
# for prob in preds:
# indices.append([prob['pctr'], prob['pcvr'], prob['pctcvr']])
# return indices
result
.
append
([
str
(
prob
[
"uid"
][
0
]),
str
(
prob
[
"city"
][
0
]),
str
(
prob
[
"cid_id"
][
0
]),
str
(
prob
[
'pctcvr'
])])
def
trans
(
x
):
return
str
(
x
)[
2
:
-
1
]
if
str
(
x
)[
0
]
==
'b'
else
x
...
...
@@ -208,22 +193,49 @@ def set_join(lst):
if
__name__
==
"__main__"
:
if
sys
.
argv
[
1
]
==
"n
earby
"
:
if
sys
.
argv
[
1
]
==
"n
ative
"
:
b
=
time
.
time
()
print
(
"infer native task"
)
path
=
"hdfs://172.16.32.4:8020/strategy/esmm/"
# df = spark.read.format("tfrecords").load(path+"test_native/part-r-00000")
# df.show()
te_files
=
[
"hdfs://172.16.32.4:8020/strategy/esmm/test_native/part-r-00000"
]
print
(
"dist predict native"
)
print
(
"耗时(秒):"
)
print
((
time
.
time
()
-
b
))
if
sys
.
argv
[
1
]
==
"nearby"
:
print
(
"infer nearby task"
)
b
=
time
.
time
()
path
=
"hdfs://172.16.32.4:8020/strategy/esmm/"
# df = spark.read.format("tfrecords").load(path+"test_nearby/part-r-00000")
# df.show()
te_files
=
[
"hdfs://172.16.32.4:8020/strategy/esmm/test_nearby/part-r-00000"
]
print
(
"-"
*
100
)
indices
=
main
(
te_files
)
print
(
indices
[
0
])
result
=
main
(
te_files
)
df
=
pd
.
DataFrame
(
result
,
columns
=
[
"uid"
,
"city"
,
"cid_id"
,
"pctcvr"
])
df
.
head
(
10
)
host
=
'172.16.40.158'
port
=
4000
user
=
'root'
password
=
'3SYz54LS9#^9sBvC'
db
=
'jerry_test'
charset
=
'utf8'
print
(
"耗时(min):"
)
print
((
time
.
time
()
-
b
)
/
60
)
print
(
"耗时(秒):"
)
print
((
time
.
time
()
-
b
))
else
:
print
(
"hello"
)
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment