Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
397c99b0
Commit
397c99b0
authored
5 years ago
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
change test file
parent
c1b3e72b
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
774 additions
and
2 deletions
+774
-2
ctr.py
ctr.py
+2
-2
feature_bias.py
eda/esmm/Model_pipline/feature_bias.py
+363
-0
train_bias.py
eda/esmm/Model_pipline/train_bias.py
+409
-0
No files found.
ctr.py
View file @
397c99b0
...
@@ -175,7 +175,6 @@ def write_to_kafka():
...
@@ -175,7 +175,6 @@ def write_to_kafka():
except
kafka_errors
as
e
:
except
kafka_errors
as
e
:
print
(
str
(
e
))
print
(
str
(
e
))
def
Ctr
(
rdd
):
def
Ctr
(
rdd
):
try
:
try
:
results
=
rdd
results
=
rdd
...
@@ -197,7 +196,8 @@ def m_decoder(s):
...
@@ -197,7 +196,8 @@ def m_decoder(s):
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
# Spark-Streaming-Kafka
# Spark-Streaming-Kafka
sc
=
SparkContext
(
conf
=
SparkConf
()
.
setMaster
(
"spark://nvwa01:7077"
)
.
setAppName
(
"kafka_test"
)
.
set
(
"spark.io.compression.codec"
,
"lzf"
))
sc
=
SparkContext
(
conf
=
SparkConf
()
.
setMaster
(
"spark://nvwa01:7077"
)
.
setAppName
(
"kafka_test"
)
.
set
(
"spark.io.compression.codec"
,
"lzf"
))
ssc
=
SQLContext
(
sc
)
ssc
=
SQLContext
(
sc
)
ssc
=
StreamingContext
(
sc
,
10
)
ssc
=
StreamingContext
(
sc
,
10
)
sc
.
setLogLevel
(
"WARN"
)
sc
.
setLogLevel
(
"WARN"
)
...
...
This diff is collapsed.
Click to expand it.
eda/esmm/Model_pipline/feature_bias.py
0 → 100644
View file @
397c99b0
# -*- coding: utf-8 -*-
import
pymysql
from
pyspark.conf
import
SparkConf
import
pytispark.pytispark
as
pti
from
pyspark.sql
import
SparkSession
import
datetime
import
pandas
as
pd
import
time
from
pyspark
import
StorageLevel
def
app_list_func
(
x
,
l
):
b
=
str
(
x
)
.
split
(
","
)
e
=
[]
for
i
in
b
:
if
i
in
l
.
keys
():
e
.
append
(
l
[
i
])
else
:
e
.
append
(
0
)
return
e
def
get_list
(
db
,
sql
,
n
):
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
v
=
list
(
set
([
i
[
0
]
for
i
in
result
]))
app_list_value
=
[
str
(
i
)
.
split
(
","
)
for
i
in
v
]
app_list_unique
=
[]
for
i
in
app_list_value
:
app_list_unique
.
extend
(
i
)
app_list_unique
=
list
(
set
(
app_list_unique
))
number
=
len
(
app_list_unique
)
app_list_map
=
dict
(
zip
(
app_list_unique
,
list
(
range
(
n
,
number
+
n
))))
db
.
close
()
return
number
,
app_list_map
def
get_map
():
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select app_list from device_app_list"
a
=
time
.
time
()
apps_number
,
app_list_map
=
get_list
(
db
,
sql
,
16
)
print
(
"applist"
)
print
((
time
.
time
()
-
a
)
/
60
)
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select level2_ids from diary_feat"
b
=
time
.
time
()
leve2_number
,
leve2_map
=
get_list
(
db
,
sql
,
16
+
apps_number
)
print
(
"leve2"
)
print
((
time
.
time
()
-
b
)
/
60
)
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select level3_ids from diary_feat"
c
=
time
.
time
()
leve3_number
,
leve3_map
=
get_list
(
db
,
sql
,
16
+
leve2_number
+
apps_number
)
print
((
time
.
time
()
-
c
)
/
60
)
return
apps_number
,
app_list_map
,
leve2_number
,
leve2_map
,
leve3_number
,
leve3_map
def
get_unique
(
db
,
sql
):
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
v
=
list
(
set
([
i
[
0
]
for
i
in
result
]))
db
.
close
()
print
(
sql
)
print
(
len
(
v
))
return
v
def
con_sql
(
db
,
sql
):
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
df
=
pd
.
DataFrame
(
list
(
result
))
db
.
close
()
return
df
def
get_pre_number
():
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select count(*) from esmm_pre_data"
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchone
()[
0
]
print
(
"预测集数量:"
)
print
(
result
)
db
.
close
()
def
feature_engineer
():
apps_number
,
app_list_map
,
level2_number
,
leve2_map
,
level3_number
,
leve3_map
=
get_map
()
app_list_map
[
"app_list"
]
=
16
leve3_map
[
"level3_ids"
]
=
17
leve3_map
[
"search_tag3"
]
=
18
leve2_map
[
"level2_ids"
]
=
19
leve2_map
[
"tag1"
]
=
20
leve2_map
[
"tag2"
]
=
21
leve2_map
[
"tag3"
]
=
22
leve2_map
[
"tag4"
]
=
23
leve2_map
[
"tag5"
]
=
24
leve2_map
[
"tag6"
]
=
25
leve2_map
[
"tag7"
]
=
26
leve2_map
[
"search_tag2"
]
=
27
unique_values
=
[]
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct stat_date from esmm_train_bias"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct ucity_id from esmm_train_bias"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct ccity_name from esmm_train_bias"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct bias from esmm_train_bias"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct device_type from user_feature"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct manufacturer from user_feature"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct channel from user_feature"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct price_min from knowledge"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct treatment_method from knowledge"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct price_max from knowledge"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct treatment_time from knowledge"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct maintain_time from knowledge"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select distinct recover_time from knowledge"
unique_values
.
extend
(
get_unique
(
db
,
sql
))
# unique_values.append("video")
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
)
sql
=
"select max(stat_date) from esmm_train_bias"
validate_date
=
con_sql
(
db
,
sql
)[
0
]
.
values
.
tolist
()[
0
]
print
(
"validate_date:"
+
validate_date
)
temp
=
datetime
.
datetime
.
strptime
(
validate_date
,
"
%
Y-
%
m-
%
d"
)
start
=
(
temp
-
datetime
.
timedelta
(
days
=
180
))
.
strftime
(
"
%
Y-
%
m-
%
d"
)
print
(
start
)
features
=
[
"ucity_id"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"stat_date"
,
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
,
"app_list"
,
"level3_ids"
,
"level2_ids"
,
"bias"
,
"search_tag2"
,
"search_tag3"
]
unique_values
.
extend
(
features
)
print
(
"unique_values length"
)
print
(
len
(
unique_values
))
print
(
"特征维度:"
)
print
(
apps_number
+
level2_number
+
level3_number
+
len
(
unique_values
))
temp
=
list
(
range
(
29
+
apps_number
+
level2_number
+
level3_number
,
29
+
apps_number
+
level2_number
+
level3_number
+
len
(
unique_values
)))
value_map
=
dict
(
zip
(
unique_values
,
temp
))
sql
=
"select e.y,e.z,e.stat_date,e.ucity_id,e.bias,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer,"
\
"u.channel,dl.app_list,feat.level3_ids,doctor.hospital_id,"
\
"doris.search_tag2,doris.search_tag3,"
\
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time,"
\
"e.device_id,e.cid_id "
\
"from jerry_test.esmm_train_bias e left join jerry_test.user_feature u on e.device_id = u.device_id "
\
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id "
\
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id "
\
"left join jerry_test.knowledge k on feat.level2 = k.level2_id "
\
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id "
\
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id "
\
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date "
\
"where e.stat_date >= '{}'"
.
format
(
start
)
df
=
spark
.
sql
(
sql
)
df
=
df
.
drop_duplicates
([
"ucity_id"
,
"level2_ids"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"stat_date"
,
"app_list"
,
"hospital_id"
,
"level3_ids"
,
"bias"
,
"search_tag2"
,
"search_tag3"
])
df
=
df
.
na
.
fill
(
dict
(
zip
(
features
,
features
)))
rdd
=
df
.
select
(
"stat_date"
,
"y"
,
"z"
,
"app_list"
,
"level2_ids"
,
"level3_ids"
,
"ucity_id"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
,
"bias"
,
"search_tag2"
,
"search_tag3"
,
"cid_id"
,
"device_id"
)
\
.
rdd
.
repartition
(
200
)
.
map
(
lambda
x
:
(
x
[
0
],
float
(
x
[
1
]),
float
(
x
[
2
]),
app_list_func
(
x
[
3
],
app_list_map
),
app_list_func
(
x
[
4
],
leve2_map
),
app_list_func
(
x
[
5
],
leve3_map
),
[
value_map
.
get
(
x
[
0
],
1
),
value_map
.
get
(
x
[
6
],
2
),
value_map
.
get
(
x
[
7
],
3
),
value_map
.
get
(
x
[
8
],
4
),
value_map
.
get
(
x
[
9
],
5
),
value_map
.
get
(
x
[
10
],
6
),
value_map
.
get
(
x
[
11
],
7
),
value_map
.
get
(
x
[
12
],
8
),
value_map
.
get
(
x
[
13
],
9
),
value_map
.
get
(
x
[
14
],
10
),
value_map
.
get
(
x
[
15
],
11
),
value_map
.
get
(
x
[
16
],
12
),
value_map
.
get
(
x
[
17
],
13
)],
app_list_func
(
x
[
18
],
leve2_map
),
app_list_func
(
x
[
19
],
leve3_map
),
x
[
6
],
x
[
20
],
x
[
21
]
))
rdd
.
persist
(
storageLevel
=
StorageLevel
.
MEMORY_ONLY_SER
)
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
train
=
rdd
.
filter
(
lambda
x
:
x
[
0
]
!=
validate_date
)
.
map
(
lambda
x
:
(
x
[
1
],
x
[
2
],
x
[
3
],
x
[
4
],
x
[
5
],
x
[
6
],
x
[
7
],
x
[
8
],
x
[
9
],
x
[
10
],
x
[
11
]))
f
=
time
.
time
()
spark
.
createDataFrame
(
train
)
.
toDF
(
"y"
,
"z"
,
"app_list"
,
"level2_list"
,
"level3_list"
,
"ids"
,
"search_tag2_list"
,
"search_tag3_list"
,
"city"
,
"cid_id"
,
"uid"
)
\
.
repartition
(
1
)
.
write
.
format
(
"tfrecords"
)
.
save
(
path
=
path
+
"tr/"
,
mode
=
"overwrite"
)
h
=
time
.
time
()
print
(
"train tfrecord done"
)
print
((
h
-
f
)
/
60
)
print
(
"训练集样本总量:"
)
print
(
rdd
.
count
())
# get_pre_number()
test
=
rdd
.
filter
(
lambda
x
:
x
[
0
]
==
validate_date
)
.
map
(
lambda
x
:
(
x
[
1
],
x
[
2
],
x
[
3
],
x
[
4
],
x
[
5
],
x
[
6
],
x
[
7
],
x
[
8
],
x
[
9
],
x
[
10
],
x
[
11
]))
spark
.
createDataFrame
(
test
)
.
toDF
(
"y"
,
"z"
,
"app_list"
,
"level2_list"
,
"level3_list"
,
"ids"
,
"search_tag2_list"
,
"search_tag3_list"
,
"city"
,
"cid_id"
,
"uid"
)
\
.
repartition
(
1
)
.
write
.
format
(
"tfrecords"
)
.
save
(
path
=
path
+
"va/"
,
mode
=
"overwrite"
)
print
(
"va tfrecord done"
)
rdd
.
unpersist
()
return
validate_date
,
value_map
,
app_list_map
,
leve2_map
,
leve3_map
def
get_predict
(
date
,
value_map
,
app_list_map
,
leve2_map
,
leve3_map
):
sql
=
"select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name,"
\
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time,"
\
"dl.app_list,e.hospital_id,feat.level3_ids,"
\
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4,"
\
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3,"
\
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time "
\
"from jerry_test.esmm_pre_data e "
\
"left join jerry_test.user_feature u on e.device_id = u.device_id "
\
"left join jerry_test.cid_type_top c on e.device_id = c.device_id "
\
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid "
\
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id "
\
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id "
\
"left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id "
\
"left join jerry_test.question_tag question on e.device_id = question.device_id "
\
"left join jerry_test.search_tag search on e.device_id = search.device_id "
\
"left join jerry_test.budan_tag budan on e.device_id = budan.device_id "
\
"left join jerry_test.order_tag ot on e.device_id = ot.device_id "
\
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id "
\
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id "
\
"left join jerry_test.knowledge k on feat.level2 = k.level2_id "
\
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date "
\
"where e.device_id = 'C33E2C8E-86E9-4C91-8458-526FB81E4C78'"
features
=
[
"ucity_id"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"top"
,
"time"
,
"hospital_id"
,
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
,
"app_list"
,
"level3_ids"
,
"level2_ids"
,
"tag1"
,
"tag2"
,
"tag3"
,
"tag4"
,
"tag5"
,
"tag6"
,
"tag7"
,
"search_tag2"
,
"search_tag3"
]
df
=
spark
.
sql
(
sql
)
df
=
df
.
drop_duplicates
([
"ucity_id"
,
"device_id"
,
"cid_id"
])
df
=
df
.
na
.
fill
(
dict
(
zip
(
features
,
features
)))
f
=
time
.
time
()
rdd
=
df
.
select
(
"label"
,
"y"
,
"z"
,
"ucity_id"
,
"device_id"
,
"cid_id"
,
"app_list"
,
"level2_ids"
,
"level3_ids"
,
"tag1"
,
"tag2"
,
"tag3"
,
"tag4"
,
"tag5"
,
"tag6"
,
"tag7"
,
"ucity_id"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"top"
,
"time"
,
"hospital_id"
,
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
,
"search_tag2"
,
"search_tag3"
)
\
.
rdd
.
repartition
(
200
)
.
map
(
lambda
x
:
(
x
[
0
],
float
(
x
[
1
]),
float
(
x
[
2
]),
x
[
3
],
x
[
4
],
x
[
5
],
app_list_func
(
x
[
6
],
app_list_map
),
app_list_func
(
x
[
7
],
leve2_map
),
app_list_func
(
x
[
8
],
leve3_map
),
app_list_func
(
x
[
9
],
leve2_map
),
app_list_func
(
x
[
10
],
leve2_map
),
app_list_func
(
x
[
11
],
leve2_map
),
app_list_func
(
x
[
12
],
leve2_map
),
app_list_func
(
x
[
13
],
leve2_map
),
app_list_func
(
x
[
14
],
leve2_map
),
app_list_func
(
x
[
15
],
leve2_map
),
[
value_map
.
get
(
date
,
1
),
value_map
.
get
(
x
[
16
],
2
),
value_map
.
get
(
x
[
17
],
3
),
value_map
.
get
(
x
[
18
],
4
),
value_map
.
get
(
x
[
19
],
5
),
value_map
.
get
(
x
[
20
],
6
),
value_map
.
get
(
x
[
21
],
7
),
value_map
.
get
(
x
[
22
],
8
),
value_map
.
get
(
x
[
23
],
9
),
value_map
.
get
(
x
[
24
],
10
),
value_map
.
get
(
x
[
25
],
11
),
value_map
.
get
(
x
[
26
],
12
),
value_map
.
get
(
x
[
27
],
13
),
value_map
.
get
(
x
[
28
],
14
),
value_map
.
get
(
x
[
29
],
15
)],
app_list_func
(
x
[
30
],
leve2_map
),
app_list_func
(
x
[
31
],
leve3_map
)))
rdd
.
persist
(
storageLevel
=
StorageLevel
.
MEMORY_ONLY_SER
)
print
(
"预测集样本大小:"
)
print
(
rdd
.
count
())
if
rdd
.
filter
(
lambda
x
:
x
[
0
]
==
0
)
.
count
()
>
0
:
print
(
"预测集native有数据"
)
spark
.
createDataFrame
(
rdd
.
filter
(
lambda
x
:
x
[
0
]
==
0
)
.
map
(
lambda
x
:
(
x
[
1
],
x
[
2
],
x
[
6
],
x
[
7
],
x
[
8
],
x
[
9
],
x
[
10
],
x
[
11
],
x
[
12
],
x
[
13
],
x
[
14
],
x
[
15
],
x
[
16
],
x
[
17
],
x
[
18
],
x
[
3
],
x
[
4
],
x
[
5
])))
\
.
toDF
(
"y"
,
"z"
,
"app_list"
,
"level2_list"
,
"level3_list"
,
"tag1_list"
,
"tag2_list"
,
"tag3_list"
,
"tag4_list"
,
"tag5_list"
,
"tag6_list"
,
"tag7_list"
,
"ids"
,
"search_tag2_list"
,
"search_tag3_list"
,
"city"
,
"uid"
,
"cid_id"
)
\
.
repartition
(
1
)
.
write
.
format
(
"tfrecords"
)
.
save
(
path
=
path
+
"native/"
,
mode
=
"overwrite"
)
print
(
"native tfrecord done"
)
h
=
time
.
time
()
print
((
h
-
f
)
/
60
)
else
:
print
(
"预测集native为空"
)
if
rdd
.
filter
(
lambda
x
:
x
[
0
]
==
1
)
.
count
()
>
0
:
print
(
"预测集nearby有数据"
)
spark
.
createDataFrame
(
rdd
.
filter
(
lambda
x
:
x
[
0
]
==
1
)
.
map
(
lambda
x
:
(
x
[
1
],
x
[
2
],
x
[
6
],
x
[
7
],
x
[
8
],
x
[
9
],
x
[
10
],
x
[
11
],
x
[
12
],
x
[
13
],
x
[
14
],
x
[
15
],
x
[
16
],
x
[
17
],
x
[
18
],
x
[
3
],
x
[
4
],
x
[
5
])))
\
.
toDF
(
"y"
,
"z"
,
"app_list"
,
"level2_list"
,
"level3_list"
,
"tag1_list"
,
"tag2_list"
,
"tag3_list"
,
"tag4_list"
,
"tag5_list"
,
"tag6_list"
,
"tag7_list"
,
"ids"
,
"search_tag2_list"
,
"search_tag3_list"
,
"city"
,
"uid"
,
"cid_id"
)
\
.
repartition
(
1
)
.
write
.
format
(
"tfrecords"
)
.
save
(
path
=
path
+
"nearby/"
,
mode
=
"overwrite"
)
print
(
"nearby tfrecord done"
)
else
:
print
(
"预测集nearby为空"
)
if
__name__
==
'__main__'
:
sparkConf
=
SparkConf
()
.
set
(
"spark.hive.mapred.supports.subdirectories"
,
"true"
)
\
.
set
(
"spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive"
,
"true"
)
\
.
set
(
"spark.tispark.plan.allow_index_double_read"
,
"false"
)
\
.
set
(
"spark.tispark.plan.allow_index_read"
,
"true"
)
\
.
set
(
"spark.sql.extensions"
,
"org.apache.spark.sql.TiExtensions"
)
\
.
set
(
"spark.tispark.pd.addresses"
,
"172.16.40.158:2379"
)
.
set
(
"spark.io.compression.codec"
,
"lzf"
)
\
.
set
(
"spark.driver.maxResultSize"
,
"8g"
)
.
set
(
"spark.sql.avro.compression.codec"
,
"snappy"
)
spark
=
SparkSession
.
builder
.
config
(
conf
=
sparkConf
)
.
enableHiveSupport
()
.
getOrCreate
()
ti
=
pti
.
TiContext
(
spark
)
ti
.
tidbMapDatabase
(
"jerry_test"
)
ti
.
tidbMapDatabase
(
"eagle"
)
spark
.
sparkContext
.
setLogLevel
(
"WARN"
)
path
=
"hdfs:///strategy/esmm/"
local_path
=
"/home/gmuser/esmm/"
validate_date
,
value_map
,
app_list_map
,
leve2_map
,
leve3_map
=
feature_engineer
()
get_predict
(
validate_date
,
value_map
,
app_list_map
,
leve2_map
,
leve3_map
)
spark
.
stop
()
This diff is collapsed.
Click to expand it.
eda/esmm/Model_pipline/train_bias.py
0 → 100644
View file @
397c99b0
#coding=utf-8
import
pymysql
import
os
import
json
from
datetime
import
date
,
timedelta
import
tensorflow
as
tf
import
time
import
pandas
as
pd
import
datetime
#################### CMD Arguments ####################
FLAGS
=
tf
.
app
.
flags
.
FLAGS
tf
.
app
.
flags
.
DEFINE_integer
(
"dist_mode"
,
0
,
"distribuion mode {0-loacal, 1-single_dist, 2-multi_dist}"
)
tf
.
app
.
flags
.
DEFINE_string
(
"ps_hosts"
,
''
,
"Comma-separated list of hostname:port pairs"
)
tf
.
app
.
flags
.
DEFINE_string
(
"worker_hosts"
,
''
,
"Comma-separated list of hostname:port pairs"
)
tf
.
app
.
flags
.
DEFINE_string
(
"job_name"
,
''
,
"One of 'ps', 'worker'"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"task_index"
,
0
,
"Index of task within the job"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"num_threads"
,
16
,
"Number of threads"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"feature_size"
,
0
,
"Number of features"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"field_size"
,
0
,
"Number of common fields"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"embedding_size"
,
32
,
"Embedding size"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"num_epochs"
,
10
,
"Number of epochs"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"batch_size"
,
64
,
"Number of batch size"
)
tf
.
app
.
flags
.
DEFINE_integer
(
"log_steps"
,
1000
,
"save summary every steps"
)
tf
.
app
.
flags
.
DEFINE_float
(
"learning_rate"
,
0.0005
,
"learning rate"
)
tf
.
app
.
flags
.
DEFINE_float
(
"l2_reg"
,
0.0001
,
"L2 regularization"
)
tf
.
app
.
flags
.
DEFINE_string
(
"loss_type"
,
'log_loss'
,
"loss type {square_loss, log_loss}"
)
tf
.
app
.
flags
.
DEFINE_float
(
"ctr_task_wgt"
,
0.5
,
"loss weight of ctr task"
)
tf
.
app
.
flags
.
DEFINE_string
(
"optimizer"
,
'Adam'
,
"optimizer type {Adam, Adagrad, GD, Momentum}"
)
tf
.
app
.
flags
.
DEFINE_string
(
"deep_layers"
,
'256,128,64'
,
"deep layers"
)
tf
.
app
.
flags
.
DEFINE_string
(
"dropout"
,
'0.5,0.5,0.5'
,
"dropout rate"
)
tf
.
app
.
flags
.
DEFINE_boolean
(
"batch_norm"
,
False
,
"perform batch normaization (True or False)"
)
tf
.
app
.
flags
.
DEFINE_float
(
"batch_norm_decay"
,
0.9
,
"decay for the moving average(recommend trying decay=0.9)"
)
tf
.
app
.
flags
.
DEFINE_string
(
"hdfs_dir"
,
''
,
"hdfs dir"
)
tf
.
app
.
flags
.
DEFINE_string
(
"local_dir"
,
''
,
"local dir"
)
tf
.
app
.
flags
.
DEFINE_string
(
"dt_dir"
,
''
,
"data dt partition"
)
tf
.
app
.
flags
.
DEFINE_string
(
"model_dir"
,
''
,
"model check point dir"
)
tf
.
app
.
flags
.
DEFINE_string
(
"servable_model_dir"
,
''
,
"export servable model for TensorFlow Serving"
)
tf
.
app
.
flags
.
DEFINE_string
(
"task_type"
,
'train'
,
"task type {train, infer, eval, export}"
)
tf
.
app
.
flags
.
DEFINE_boolean
(
"clear_existing_model"
,
False
,
"clear existing model or not"
)
def
input_fn
(
filenames
,
batch_size
=
32
,
num_epochs
=
1
,
perform_shuffle
=
False
):
print
(
'Parsing'
,
filenames
)
def
_parse_fn
(
record
):
features
=
{
"y"
:
tf
.
FixedLenFeature
([],
tf
.
float32
),
"z"
:
tf
.
FixedLenFeature
([],
tf
.
float32
),
"ids"
:
tf
.
FixedLenFeature
([
FLAGS
.
field_size
],
tf
.
int64
),
"app_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"level2_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"level3_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"search_tag2_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"search_tag3_list"
:
tf
.
VarLenFeature
(
tf
.
int64
),
"uid"
:
tf
.
VarLenFeature
(
tf
.
string
),
"city"
:
tf
.
VarLenFeature
(
tf
.
string
),
"cid_id"
:
tf
.
VarLenFeature
(
tf
.
string
)
}
parsed
=
tf
.
parse_single_example
(
record
,
features
)
y
=
parsed
.
pop
(
'y'
)
z
=
parsed
.
pop
(
'z'
)
return
parsed
,
{
"y"
:
y
,
"z"
:
z
}
# Extract lines from input files using the Dataset API, can pass one filename or filename list
# dataset = tf.data.TFRecordDataset(filenames).map(_parse_fn, num_parallel_calls=8).prefetch(500000) # multi-thread pre-process then prefetch
# Randomizes input using a window of 256 elements (read into memory)
# if perform_shuffle:
# dataset = dataset.shuffle(buffer_size=256)
# epochs from blending together.
# dataset = dataset.repeat(num_epochs)
# dataset = dataset.batch(batch_size) # Batch size to use
files
=
tf
.
data
.
Dataset
.
list_files
(
filenames
)
dataset
=
files
.
apply
(
tf
.
data
.
experimental
.
parallel_interleave
(
lambda
file
:
tf
.
data
.
TFRecordDataset
(
file
),
cycle_length
=
8
)
)
dataset
=
dataset
.
apply
(
tf
.
data
.
experimental
.
map_and_batch
(
map_func
=
_parse_fn
,
batch_size
=
batch_size
,
num_parallel_calls
=
8
))
dataset
=
dataset
.
prefetch
(
10000
)
# dataset = dataset.padded_batch(batch_size, padded_shapes=({"feeds_ids": [None], "feeds_vals": [None], "title_ids": [None]}, [None])) #不定长补齐
#return dataset.make_one_shot_iterator()
iterator
=
dataset
.
make_one_shot_iterator
()
batch_features
,
batch_labels
=
iterator
.
get_next
()
#return tf.reshape(batch_ids,shape=[-1,field_size]), tf.reshape(batch_vals,shape=[-1,field_size]), batch_labels
#print("-"*100)
#print(batch_features,batch_labels)
return
batch_features
,
batch_labels
def
model_fn
(
features
,
labels
,
mode
,
params
):
"""Bulid Model function f(x) for Estimator."""
#------hyperparameters----
field_size
=
params
[
"field_size"
]
feature_size
=
params
[
"feature_size"
]
embedding_size
=
params
[
"embedding_size"
]
l2_reg
=
params
[
"l2_reg"
]
learning_rate
=
params
[
"learning_rate"
]
#optimizer = params["optimizer"]
layers
=
list
(
map
(
int
,
params
[
"deep_layers"
]
.
split
(
','
)))
dropout
=
list
(
map
(
float
,
params
[
"dropout"
]
.
split
(
','
)))
ctr_task_wgt
=
params
[
"ctr_task_wgt"
]
common_dims
=
field_size
*
embedding_size
#------bulid weights------
Feat_Emb
=
tf
.
get_variable
(
name
=
'embeddings'
,
shape
=
[
feature_size
,
embedding_size
],
initializer
=
tf
.
glorot_normal_initializer
())
feat_ids
=
features
[
'ids'
]
app_list
=
features
[
'app_list'
]
level2_list
=
features
[
'level2_list'
]
level3_list
=
features
[
'level3_list'
]
search_tag2_list
=
features
[
'search_tag2_list'
]
search_tag3_list
=
features
[
'search_tag3_list'
]
uid
=
features
[
'uid'
]
city
=
features
[
'city'
]
cid_id
=
features
[
'cid_id'
]
if
FLAGS
.
task_type
!=
"infer"
:
y
=
labels
[
'y'
]
z
=
labels
[
'z'
]
#------build f(x)------
with
tf
.
variable_scope
(
"Shared-Embedding-layer"
):
embedding_id
=
tf
.
nn
.
embedding_lookup
(
Feat_Emb
,
feat_ids
)
app_id
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
app_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
level2
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
level2_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
level3
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
level3_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
search_tag2
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
search_tag2_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
search_tag3
=
tf
.
nn
.
embedding_lookup_sparse
(
Feat_Emb
,
sp_ids
=
search_tag3_list
,
sp_weights
=
None
,
combiner
=
"sum"
)
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat
=
tf
.
concat
([
tf
.
reshape
(
embedding_id
,
shape
=
[
-
1
,
common_dims
]),
app_id
,
level2
,
level3
,
search_tag2
,
search_tag3
],
axis
=
1
)
uid
=
tf
.
sparse
.
to_dense
(
uid
,
default_value
=
""
)
city
=
tf
.
sparse
.
to_dense
(
city
,
default_value
=
""
)
cid_id
=
tf
.
sparse
.
to_dense
(
cid_id
,
default_value
=
""
)
with
tf
.
name_scope
(
"CVR_Task"
):
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
train_phase
=
True
else
:
train_phase
=
False
x_cvr
=
x_concat
for
i
in
range
(
len
(
layers
)):
x_cvr
=
tf
.
contrib
.
layers
.
fully_connected
(
inputs
=
x_cvr
,
num_outputs
=
layers
[
i
],
\
weights_regularizer
=
tf
.
contrib
.
layers
.
l2_regularizer
(
l2_reg
),
scope
=
'cvr_mlp
%
d'
%
i
)
if
FLAGS
.
batch_norm
:
x_cvr
=
batch_norm_layer
(
x_cvr
,
train_phase
=
train_phase
,
scope_bn
=
'cvr_bn_
%
d'
%
i
)
#放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
x_cvr
=
tf
.
nn
.
dropout
(
x_cvr
,
keep_prob
=
dropout
[
i
])
#Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_cvr
=
tf
.
contrib
.
layers
.
fully_connected
(
inputs
=
x_cvr
,
num_outputs
=
1
,
activation_fn
=
tf
.
identity
,
\
weights_regularizer
=
tf
.
contrib
.
layers
.
l2_regularizer
(
l2_reg
),
scope
=
'cvr_out'
)
y_cvr
=
tf
.
reshape
(
y_cvr
,
shape
=
[
-
1
])
with
tf
.
name_scope
(
"CTR_Task"
):
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
train_phase
=
True
else
:
train_phase
=
False
x_ctr
=
x_concat
for
i
in
range
(
len
(
layers
)):
x_ctr
=
tf
.
contrib
.
layers
.
fully_connected
(
inputs
=
x_ctr
,
num_outputs
=
layers
[
i
],
\
weights_regularizer
=
tf
.
contrib
.
layers
.
l2_regularizer
(
l2_reg
),
scope
=
'ctr_mlp
%
d'
%
i
)
if
FLAGS
.
batch_norm
:
x_ctr
=
batch_norm_layer
(
x_ctr
,
train_phase
=
train_phase
,
scope_bn
=
'ctr_bn_
%
d'
%
i
)
#放在RELU之后 https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
x_ctr
=
tf
.
nn
.
dropout
(
x_ctr
,
keep_prob
=
dropout
[
i
])
#Apply Dropout after all BN layers and set dropout=0.8(drop_ratio=0.2)
y_ctr
=
tf
.
contrib
.
layers
.
fully_connected
(
inputs
=
x_ctr
,
num_outputs
=
1
,
activation_fn
=
tf
.
identity
,
\
weights_regularizer
=
tf
.
contrib
.
layers
.
l2_regularizer
(
l2_reg
),
scope
=
'ctr_out'
)
y_ctr
=
tf
.
reshape
(
y_ctr
,
shape
=
[
-
1
])
with
tf
.
variable_scope
(
"MTL-Layer"
):
pctr
=
tf
.
sigmoid
(
y_ctr
)
pcvr
=
tf
.
sigmoid
(
y_cvr
)
pctcvr
=
pctr
*
pcvr
predictions
=
{
"pctcvr"
:
pctcvr
,
"uid"
:
uid
,
"city"
:
city
,
"cid_id"
:
cid_id
}
export_outputs
=
{
tf
.
saved_model
.
signature_constants
.
DEFAULT_SERVING_SIGNATURE_DEF_KEY
:
tf
.
estimator
.
export
.
PredictOutput
(
predictions
)}
# Provide an estimator spec for `ModeKeys.PREDICT`
if
mode
==
tf
.
estimator
.
ModeKeys
.
PREDICT
:
return
tf
.
estimator
.
EstimatorSpec
(
mode
=
mode
,
predictions
=
predictions
,
export_outputs
=
export_outputs
)
if
FLAGS
.
task_type
!=
"infer"
:
#------bulid loss------
ctr_loss
=
tf
.
reduce_mean
(
tf
.
nn
.
sigmoid_cross_entropy_with_logits
(
logits
=
y_ctr
,
labels
=
y
))
#cvr_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_ctcvr, labels=z))
cvr_loss
=
tf
.
reduce_mean
(
tf
.
losses
.
log_loss
(
predictions
=
pctcvr
,
labels
=
z
))
loss
=
ctr_task_wgt
*
ctr_loss
+
(
1
-
ctr_task_wgt
)
*
cvr_loss
+
l2_reg
*
tf
.
nn
.
l2_loss
(
Feat_Emb
)
tf
.
summary
.
scalar
(
'ctr_loss'
,
ctr_loss
)
tf
.
summary
.
scalar
(
'cvr_loss'
,
cvr_loss
)
# Provide an estimator spec for `ModeKeys.EVAL`
eval_metric_ops
=
{
"CTR_AUC"
:
tf
.
metrics
.
auc
(
y
,
pctr
),
#"CTR_F1": tf.contrib.metrics.f1_score(y,pctr),
#"CTR_Precision": tf.metrics.precision(y,pctr),
#"CTR_Recall": tf.metrics.recall(y,pctr),
"CVR_AUC"
:
tf
.
metrics
.
auc
(
z
,
pcvr
),
"CTCVR_AUC"
:
tf
.
metrics
.
auc
(
z
,
pctcvr
)
}
if
mode
==
tf
.
estimator
.
ModeKeys
.
EVAL
:
return
tf
.
estimator
.
EstimatorSpec
(
mode
=
mode
,
predictions
=
predictions
,
loss
=
loss
,
eval_metric_ops
=
eval_metric_ops
)
#------bulid optimizer------
if
FLAGS
.
optimizer
==
'Adam'
:
optimizer
=
tf
.
train
.
AdamOptimizer
(
learning_rate
=
learning_rate
,
beta1
=
0.9
,
beta2
=
0.999
,
epsilon
=
1e-8
)
elif
FLAGS
.
optimizer
==
'Adagrad'
:
optimizer
=
tf
.
train
.
AdagradOptimizer
(
learning_rate
=
learning_rate
,
initial_accumulator_value
=
1e-8
)
elif
FLAGS
.
optimizer
==
'Momentum'
:
optimizer
=
tf
.
train
.
MomentumOptimizer
(
learning_rate
=
learning_rate
,
momentum
=
0.95
)
elif
FLAGS
.
optimizer
==
'ftrl'
:
optimizer
=
tf
.
train
.
FtrlOptimizer
(
learning_rate
)
train_op
=
optimizer
.
minimize
(
loss
,
global_step
=
tf
.
train
.
get_global_step
())
# Provide an estimator spec for `ModeKeys.TRAIN` modes
if
mode
==
tf
.
estimator
.
ModeKeys
.
TRAIN
:
return
tf
.
estimator
.
EstimatorSpec
(
mode
=
mode
,
predictions
=
predictions
,
loss
=
loss
,
train_op
=
train_op
)
def
batch_norm_layer
(
x
,
train_phase
,
scope_bn
):
bn_train
=
tf
.
contrib
.
layers
.
batch_norm
(
x
,
decay
=
FLAGS
.
batch_norm_decay
,
center
=
True
,
scale
=
True
,
updates_collections
=
None
,
is_training
=
True
,
reuse
=
None
,
scope
=
scope_bn
)
bn_infer
=
tf
.
contrib
.
layers
.
batch_norm
(
x
,
decay
=
FLAGS
.
batch_norm_decay
,
center
=
True
,
scale
=
True
,
updates_collections
=
None
,
is_training
=
False
,
reuse
=
True
,
scope
=
scope_bn
)
z
=
tf
.
cond
(
tf
.
cast
(
train_phase
,
tf
.
bool
),
lambda
:
bn_train
,
lambda
:
bn_infer
)
return
z
def
set_dist_env
():
if
FLAGS
.
dist_mode
==
1
:
# 本地分布式测试模式1 chief, 1 ps, 1 evaluator
ps_hosts
=
FLAGS
.
ps_hosts
.
split
(
','
)
chief_hosts
=
FLAGS
.
chief_hosts
.
split
(
','
)
task_index
=
FLAGS
.
task_index
job_name
=
FLAGS
.
job_name
print
(
'ps_host'
,
ps_hosts
)
print
(
'chief_hosts'
,
chief_hosts
)
print
(
'job_name'
,
job_name
)
print
(
'task_index'
,
str
(
task_index
))
# 无worker参数
tf_config
=
{
'cluster'
:
{
'chief'
:
chief_hosts
,
'ps'
:
ps_hosts
},
'task'
:
{
'type'
:
job_name
,
'index'
:
task_index
}
}
print
(
json
.
dumps
(
tf_config
))
os
.
environ
[
'TF_CONFIG'
]
=
json
.
dumps
(
tf_config
)
elif
FLAGS
.
dist_mode
==
2
:
# 集群分布式模式
ps_hosts
=
FLAGS
.
ps_hosts
.
split
(
','
)
worker_hosts
=
FLAGS
.
worker_hosts
.
split
(
','
)
chief_hosts
=
worker_hosts
[
0
:
1
]
# get first worker as chief
worker_hosts
=
worker_hosts
[
2
:]
# the rest as worker
task_index
=
FLAGS
.
task_index
job_name
=
FLAGS
.
job_name
print
(
'ps_host'
,
ps_hosts
)
print
(
'worker_host'
,
worker_hosts
)
print
(
'chief_hosts'
,
chief_hosts
)
print
(
'job_name'
,
job_name
)
print
(
'task_index'
,
str
(
task_index
))
# use #worker=0 as chief
if
job_name
==
"worker"
and
task_index
==
0
:
job_name
=
"chief"
# use #worker=1 as evaluator
if
job_name
==
"worker"
and
task_index
==
1
:
job_name
=
'evaluator'
task_index
=
0
# the others as worker
if
job_name
==
"worker"
and
task_index
>
1
:
task_index
-=
2
tf_config
=
{
'cluster'
:
{
'chief'
:
chief_hosts
,
'worker'
:
worker_hosts
,
'ps'
:
ps_hosts
},
'task'
:
{
'type'
:
job_name
,
'index'
:
task_index
}
}
print
(
json
.
dumps
(
tf_config
))
os
.
environ
[
'TF_CONFIG'
]
=
json
.
dumps
(
tf_config
)
def
main
(
file_path
):
#------check Arguments------
if
FLAGS
.
dt_dir
==
""
:
FLAGS
.
dt_dir
=
(
date
.
today
()
+
timedelta
(
-
1
))
.
strftime
(
'
%
Y
%
m
%
d'
)
FLAGS
.
model_dir
=
FLAGS
.
model_dir
+
FLAGS
.
dt_dir
#FLAGS.data_dir = FLAGS.data_dir + FLAGS.dt_dir
va_files
=
[
"hdfs://172.16.32.4:8020/strategy/esmm/va/part-r-00000"
]
# if FLAGS.clear_existing_model:
# try:
# shutil.rmtree(FLAGS.model_dir)
# except Exception as e:
# print(e, "at clear_existing_model")
# else:
# print("existing model cleaned at %s" % FLAGS.model_dir)
# set_dist_env()
#------bulid Tasks------
model_params
=
{
"field_size"
:
FLAGS
.
field_size
,
"feature_size"
:
FLAGS
.
feature_size
,
"embedding_size"
:
FLAGS
.
embedding_size
,
"learning_rate"
:
FLAGS
.
learning_rate
,
"l2_reg"
:
FLAGS
.
l2_reg
,
"deep_layers"
:
FLAGS
.
deep_layers
,
"dropout"
:
FLAGS
.
dropout
,
"ctr_task_wgt"
:
FLAGS
.
ctr_task_wgt
}
config
=
tf
.
estimator
.
RunConfig
()
.
replace
(
session_config
=
tf
.
ConfigProto
(
device_count
=
{
'GPU'
:
0
,
'CPU'
:
FLAGS
.
num_threads
}),
log_step_count_steps
=
FLAGS
.
log_steps
,
save_summary_steps
=
FLAGS
.
log_steps
)
Estimator
=
tf
.
estimator
.
Estimator
(
model_fn
=
model_fn
,
model_dir
=
FLAGS
.
model_dir
,
params
=
model_params
,
config
=
config
)
if
FLAGS
.
task_type
==
'train'
:
train_spec
=
tf
.
estimator
.
TrainSpec
(
input_fn
=
lambda
:
input_fn
(
file_path
,
num_epochs
=
FLAGS
.
num_epochs
,
batch_size
=
FLAGS
.
batch_size
))
eval_spec
=
tf
.
estimator
.
EvalSpec
(
input_fn
=
lambda
:
input_fn
(
va_files
,
num_epochs
=
1
,
batch_size
=
FLAGS
.
batch_size
),
steps
=
None
,
start_delay_secs
=
1000
,
throttle_secs
=
1200
)
result
=
tf
.
estimator
.
train_and_evaluate
(
Estimator
,
train_spec
,
eval_spec
)
for
key
,
value
in
sorted
(
result
[
0
]
.
items
()):
print
(
'
%
s:
%
s'
%
(
key
,
value
))
elif
FLAGS
.
task_type
==
'eval'
:
result
=
Estimator
.
evaluate
(
input_fn
=
lambda
:
input_fn
(
va_files
,
num_epochs
=
1
,
batch_size
=
FLAGS
.
batch_size
))
for
key
,
value
in
sorted
(
result
.
items
()):
print
(
'
%
s:
%
s'
%
(
key
,
value
))
elif
FLAGS
.
task_type
==
'infer'
:
preds
=
Estimator
.
predict
(
input_fn
=
lambda
:
input_fn
(
file_path
,
num_epochs
=
1
,
batch_size
=
FLAGS
.
batch_size
),
predict_keys
=
[
"pctcvr"
,
"uid"
,
"city"
,
"cid_id"
])
result
=
[]
for
prob
in
preds
:
result
.
append
([
str
(
prob
[
"uid"
][
0
]),
str
(
prob
[
"city"
][
0
]),
str
(
prob
[
"cid_id"
][
0
]),
str
(
prob
[
'pctcvr'
])])
return
result
elif
FLAGS
.
task_type
==
'export'
:
print
(
"Not Implemented, Do It Yourself!"
)
def
trans
(
x
):
return
str
(
x
)[
2
:
-
1
]
if
str
(
x
)[
0
]
==
'b'
else
x
def
set_join
(
lst
):
l
=
lst
.
unique
()
.
tolist
()
r
=
[
str
(
i
)
for
i
in
l
]
r
=
r
[:
500
]
return
','
.
join
(
r
)
def
df_sort
(
result
,
queue_name
):
df
=
pd
.
DataFrame
(
result
,
columns
=
[
"uid"
,
"city"
,
"cid_id"
,
"pctcvr"
])
# print(df.head(10))
df
[
'uid1'
]
=
df
[
'uid'
]
.
apply
(
trans
)
df
[
'city1'
]
=
df
[
'city'
]
.
apply
(
trans
)
df
[
'cid_id1'
]
=
df
[
'cid_id'
]
.
apply
(
trans
)
df2
=
df
.
groupby
(
by
=
[
"uid1"
,
"city1"
])
.
apply
(
lambda
x
:
x
.
sort_values
(
by
=
"pctcvr"
,
ascending
=
False
))
\
.
reset_index
(
drop
=
True
)
.
groupby
(
by
=
[
"uid1"
,
"city1"
])
.
agg
({
'cid_id1'
:
set_join
})
.
reset_index
(
drop
=
False
)
df2
.
columns
=
[
"device_id"
,
"city_id"
,
queue_name
]
df2
[
"time"
]
=
str
(
datetime
.
datetime
.
now
()
.
strftime
(
'
%
Y
%
m
%
d
%
H
%
M'
))
return
df2
def
update_or_insert
(
df2
,
queue_name
):
device_count
=
df2
.
shape
[
0
]
con
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'jerry_test'
,
charset
=
'utf8'
)
cur
=
con
.
cursor
()
try
:
for
i
in
range
(
0
,
device_count
):
query
=
"""INSERT INTO esmm_device_diary_queue (device_id, city_id, time,
%
s) VALUES('
%
s', '
%
s', '
%
s', '
%
s')
\
ON DUPLICATE KEY UPDATE device_id='
%
s', city_id='
%
s', time='
%
s',
%
s='
%
s'"""
%
(
queue_name
,
df2
.
device_id
[
i
],
df2
.
city_id
[
i
],
df2
.
time
[
i
],
df2
[
queue_name
][
i
],
df2
.
device_id
[
i
],
df2
.
city_id
[
i
],
df2
.
time
[
i
],
queue_name
,
df2
[
queue_name
][
i
])
cur
.
execute
(
query
)
con
.
commit
()
con
.
close
()
print
(
"insert or update sucess"
)
except
Exception
as
e
:
print
(
e
)
if
__name__
==
"__main__"
:
b
=
time
.
time
()
path
=
"hdfs://172.16.32.4:8020/strategy/esmm/"
tf
.
logging
.
set_verbosity
(
tf
.
logging
.
INFO
)
if
FLAGS
.
task_type
==
'train'
:
print
(
"train task"
)
tr_files
=
[
"hdfs://172.16.32.4:8020/strategy/esmm/tr/part-r-00000"
]
main
(
tr_files
)
elif
FLAGS
.
task_type
==
'infer'
:
te_files
=
[
"
%
s/part-r-00000"
%
FLAGS
.
hdfs_dir
]
queue_name
=
te_files
[
0
]
.
split
(
'/'
)[
-
2
]
+
"_queue"
print
(
queue_name
+
" task"
)
result
=
main
(
te_files
)
df
=
df_sort
(
result
,
queue_name
)
update_or_insert
(
df
,
queue_name
)
print
(
"耗时(分钟):"
)
print
((
time
.
time
()
-
b
)
/
60
)
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment