Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
82076f91
Commit
82076f91
authored
Apr 29, 2019
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
把最近一天的数据集放进训练集
parent
41528b41
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
69 additions
and
69 deletions
+69
-69
multi.py
tensnsorflow/multi.py
+69
-69
No files found.
tensnsorflow/multi.py
View file @
82076f91
...
@@ -91,7 +91,7 @@ def feature_engineer():
...
@@ -91,7 +91,7 @@ def feature_engineer():
"channel"
,
"top"
,
"time"
,
"hospital_id"
,
"treatment_method"
,
"price_min"
,
"channel"
,
"top"
,
"time"
,
"hospital_id"
,
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
,
"y"
,
"z"
,)
.
rdd
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
,
"y"
,
"z"
,)
.
rdd
rdd
.
persist
()
rdd
.
persist
()
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
train
=
rdd
.
filter
(
lambda
x
:
x
[
3
]
!=
validate_date
)
.
map
(
lambda
x
:
(
app_list_func
(
x
[
0
],
app_list_map
),
app_list_func
(
x
[
1
],
leve2_map
),
train
=
rdd
.
filter
(
lambda
x
:
x
[
3
]
!=
validate_date
)
.
map
(
lambda
x
:
(
app_list_func
(
x
[
0
],
app_list_map
),
app_list_func
(
x
[
1
],
leve2_map
),
app_list_func
(
x
[
2
],
leve3_map
),
value_map
[
x
[
3
]],
value_map
[
x
[
4
]],
app_list_func
(
x
[
2
],
leve3_map
),
value_map
[
x
[
3
]],
value_map
[
x
[
4
]],
value_map
[
x
[
5
]],
value_map
[
x
[
6
]],
value_map
[
x
[
7
]],
value_map
[
x
[
8
]],
value_map
[
x
[
5
]],
value_map
[
x
[
6
]],
value_map
[
x
[
7
]],
value_map
[
x
[
8
]],
...
@@ -114,71 +114,71 @@ def feature_engineer():
...
@@ -114,71 +114,71 @@ def feature_engineer():
return
validate_date
,
value_map
,
app_list_map
,
leve2_map
,
leve3_map
return
validate_date
,
value_map
,
app_list_map
,
leve2_map
,
leve3_map
#
def get_predict(date,value_map,app_list_map,level2_map,level3_map):
def
get_predict
(
date
,
value_map
,
app_list_map
,
level2_map
,
level3_map
):
#
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
sql
=
"select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name,"
\
#
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time,"
\
#
"dl.app_list,e.hospital_id,feat.level3_ids," \
"dl.app_list,e.hospital_id,feat.level3_ids,"
\
#
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time "
\
#
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id "
\
#
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_type_top c on e.device_id = c.device_id "
\
#
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid "
\
#
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join device_app_list dl on e.device_id = dl.device_id "
\
#
"left join diary_feat feat on e.cid_id = feat.diary_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id "
\
#
"left join train_Knowledge_network_data k on feat.level2 = k.level2_id"
"left join train_Knowledge_network_data k on feat.level2 = k.level2_id"
#
#
features = ["app_list", "level2_ids", "level3_ids","ucity_id", "ccity_name", "device_type", "manufacturer",
features
=
[
"app_list"
,
"level2_ids"
,
"level3_ids"
,
"ucity_id"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
#
"channel", "top", "time", "hospital_id",
"channel"
,
"top"
,
"time"
,
"hospital_id"
,
#
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
"recover_time"
]
#
df = spark.sql(sql)
df
=
spark
.
sql
(
sql
)
#
#
df = df.na.fill(dict(zip(features, features)))
df
=
df
.
na
.
fill
(
dict
(
zip
(
features
,
features
)))
#
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
df
=
df
.
drop_duplicates
([
"ucity_id"
,
"level2_ids"
,
"ccity_name"
,
"device_type"
,
"manufacturer"
,
#
"device_id","cid,id","label",
"device_id"
,
"cid,id"
,
"label"
,
#
"channel", "top", "time", "app_list", "hospital_id", "level3_ids"])
"channel"
,
"top"
,
"time"
,
"app_list"
,
"hospital_id"
,
"level3_ids"
])
#
#
rdd = df.select("app_list", "level2_ids", "level3_ids","ucity_id","device_id","cid_id","label", "y", "z",
rdd
=
df
.
select
(
"app_list"
,
"level2_ids"
,
"level3_ids"
,
"ucity_id"
,
"device_id"
,
"cid_id"
,
"label"
,
"y"
,
"z"
,
#
"ccity_name", "device_type","manufacturer", "channel", "top", "time", "hospital_id",
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"top"
,
"time"
,
"hospital_id"
,
#
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
#
"recover_time") \
"recover_time"
)
\
#
.rdd.map(lambda x: (app_list_func(x[0], app_list_map), app_list_func(x[1], level2_map),
.
rdd
.
map
(
lambda
x
:
(
app_list_func
(
x
[
0
],
app_list_map
),
app_list_func
(
x
[
1
],
level2_map
),
#
app_list_func(x[2], level3_map), x[3],x[4],x[5],x[6],x[7],x[8],
app_list_func
(
x
[
2
],
level3_map
),
x
[
3
],
x
[
4
],
x
[
5
],
x
[
6
],
x
[
7
],
x
[
8
],
#
value_map[x[3]], value_map[x[9]],
value_map
[
x
[
3
]],
value_map
[
x
[
9
]],
#
value_map[x[10]], value_map[x[11]], value_map[x[12]], value_map[x[13]],
value_map
[
x
[
10
]],
value_map
[
x
[
11
]],
value_map
[
x
[
12
]],
value_map
[
x
[
13
]],
#
value_map[x[14]], value_map[x[15]], value_map[x[16]], value_map[x[17]],
value_map
[
x
[
14
]],
value_map
[
x
[
15
]],
value_map
[
x
[
16
]],
value_map
[
x
[
17
]],
#
value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map
[
x
[
18
]],
value_map
[
x
[
19
]],
value_map
[
x
[
20
]],
value_map
[
x
[
21
]],
#
value_map[date]))
value_map
[
date
]))
#
#
rdd.persist()
rdd
.
persist
()
#
#
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[6] == 0).map(lambda x:(x[3],x[4],x[5])))\
native_pre
=
spark
.
createDataFrame
(
rdd
.
filter
(
lambda
x
:
x
[
6
]
==
0
)
.
map
(
lambda
x
:(
x
[
3
],
x
[
4
],
x
[
5
])))
\
#
.toDF("city","uid","cid_id")
.
toDF
(
"city"
,
"uid"
,
"cid_id"
)
#
print("native")
print
(
"native"
)
#
print(native_pre.count())
print
(
native_pre
.
count
())
#
native_pre.write.csv('/recommend', mode='overwrite', header=True)
native_pre
.
write
.
csv
(
'/recommend'
,
mode
=
'overwrite'
,
header
=
True
)
#
#
spark.createDataFrame(rdd.filter(lambda x: x[6] == 0)
spark
.
createDataFrame
(
rdd
.
filter
(
lambda
x
:
x
[
6
]
==
0
)
#
.map(lambda x: (x[0], x[1], x[2],x[9],x[10],x[11],x[12],x[13],x[14],x[15],
.
map
(
lambda
x
:
(
x
[
0
],
x
[
1
],
x
[
2
],
x
[
9
],
x
[
10
],
x
[
11
],
x
[
12
],
x
[
13
],
x
[
14
],
x
[
15
],
#
x[16,x[17],x[18],x[19],x[20],x[21],x[22],x[23]]))) \
x
[
16
,
x
[
17
],
x
[
18
],
x
[
19
],
x
[
20
],
x
[
21
],
x
[
22
],
x
[
23
]])))
\
#
.toDF("app_list", "level2_ids", "level3_ids","ucity_id",
.
toDF
(
"app_list"
,
"level2_ids"
,
"level3_ids"
,
"ucity_id"
,
#
"ccity_name", "device_type","manufacturer", "channel", "top", "time", "hospital_id",
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"top"
,
"time"
,
"hospital_id"
,
#
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
#
"recover_time", "stat_date").write.csv('/recommend/native', mode='overwrite', header=True)
"recover_time"
,
"stat_date"
)
.
write
.
csv
(
'/recommend/native'
,
mode
=
'overwrite'
,
header
=
True
)
#
#
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[6] == 1).map(lambda x: (x[3], x[4], x[5]))) \
nearby_pre
=
spark
.
createDataFrame
(
rdd
.
filter
(
lambda
x
:
x
[
6
]
==
1
)
.
map
(
lambda
x
:
(
x
[
3
],
x
[
4
],
x
[
5
])))
\
#
.toDF("city", "uid", "cid_id")
.
toDF
(
"city"
,
"uid"
,
"cid_id"
)
#
print("nearby")
print
(
"nearby"
)
#
print(nearby_pre.count())
print
(
nearby_pre
.
count
())
#
nearby_pre.write.csv('/recommend', mode='overwrite', header=True)
nearby_pre
.
write
.
csv
(
'/recommend'
,
mode
=
'overwrite'
,
header
=
True
)
#
#
spark.createDataFrame(rdd.filter(lambda x: x[6] == 1)
spark
.
createDataFrame
(
rdd
.
filter
(
lambda
x
:
x
[
6
]
==
1
)
#
.map(lambda x: (x[0], x[1], x[2], x[9], x[10], x[11], x[12], x[13], x[14], x[15],
.
map
(
lambda
x
:
(
x
[
0
],
x
[
1
],
x
[
2
],
x
[
9
],
x
[
10
],
x
[
11
],
x
[
12
],
x
[
13
],
x
[
14
],
x
[
15
],
#
x[16, x[17], x[18], x[19], x[20], x[21], x[22], x[23]]))) \
x
[
16
,
x
[
17
],
x
[
18
],
x
[
19
],
x
[
20
],
x
[
21
],
x
[
22
],
x
[
23
]])))
\
#
.toDF("app_list", "level2_ids", "level3_ids", "ucity_id",
.
toDF
(
"app_list"
,
"level2_ids"
,
"level3_ids"
,
"ucity_id"
,
#
"ccity_name", "device_type", "manufacturer", "channel", "top", "time", "hospital_id",
"ccity_name"
,
"device_type"
,
"manufacturer"
,
"channel"
,
"top"
,
"time"
,
"hospital_id"
,
#
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time",
"treatment_method"
,
"price_min"
,
"price_max"
,
"treatment_time"
,
"maintain_time"
,
#
"recover_time","stat_date").write.csv('/recommend/nearby', mode='overwrite', header=True)
"recover_time"
,
"stat_date"
)
.
write
.
csv
(
'/recommend/nearby'
,
mode
=
'overwrite'
,
header
=
True
)
#
#
rdd.unpersist()
rdd
.
unpersist
()
def
con_sql
(
db
,
sql
):
def
con_sql
(
db
,
sql
):
...
@@ -235,8 +235,8 @@ if __name__ == '__main__':
...
@@ -235,8 +235,8 @@ if __name__ == '__main__':
ti
=
pti
.
TiContext
(
spark
)
ti
=
pti
.
TiContext
(
spark
)
ti
.
tidbMapDatabase
(
"jerry_test"
)
ti
.
tidbMapDatabase
(
"jerry_test"
)
spark
.
sparkContext
.
setLogLevel
(
"WARN"
)
spark
.
sparkContext
.
setLogLevel
(
"WARN"
)
test
()
#
validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
validate_date
,
value_map
,
app_list_map
,
leve2_map
,
leve3_map
=
feature_engineer
()
#
get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
get_predict
(
validate_date
,
value_map
,
app_list_map
,
leve2_map
,
leve3_map
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment