Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
c0e265ca
Commit
c0e265ca
authored
Jun 24, 2019
by
Your Name
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
test
parent
9076bcda
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
22 additions
and
21 deletions
+22
-21
dist_predict.py
eda/esmm/Model_pipline/dist_predict.py
+21
-20
train.py
eda/esmm/Model_pipline/train.py
+1
-1
No files found.
eda/esmm/Model_pipline/dist_predict.py
View file @
c0e265ca
...
...
@@ -228,26 +228,27 @@ if __name__ == "__main__":
# df = spark.read.format("tfrecords").load(path+"test_native/part-r-00000")
# df.show()
te_files
=
[]
for
i
in
range
(
0
,
10
):
te_files
.
append
([
path
+
"test_native/part-r-0000"
+
str
(
i
)])
for
i
in
range
(
10
,
100
):
te_files
.
append
([
path
+
"test_native/part-r-000"
+
str
(
i
)])
# te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/test_native/part-r-00000"]
rdd_te_files
=
spark
.
sparkContext
.
parallelize
(
te_files
)
print
(
"-"
*
100
)
indices
=
rdd_te_files
.
repartition
(
100
)
.
map
(
lambda
x
:
main
(
x
))
# print(indices.take(1))
print
(
"dist predict native"
)
te_result_dataframe
=
spark
.
createDataFrame
(
indices
.
flatMap
(
lambda
x
:
x
.
split
(
";"
))
.
map
(
lambda
l
:
Row
(
sample_id
=
l
.
split
(
":"
)[
0
],
uid
=
l
.
split
(
":"
)[
1
],
city
=
l
.
split
(
":"
)[
2
],
cid_id
=
l
.
split
(
":"
)[
3
],
ctcvr
=
l
.
split
(
":"
)[
4
])))
# te_result_dataframe.show()
te_result_dataframe
.
repartition
(
50
)
.
write
.
format
(
"parquet"
)
.
save
(
path
=
path
+
"native_result/"
,
mode
=
"overwrite"
)
# te_files = []
# for i in range(0,10):
# te_files.append([path + "test_native/part-r-0000" + str(i)])
# for i in range(10,100):
# te_files.append([path + "test_native/part-r-000" + str(i)])
te_files
=
[
"hdfs://172.16.32.4:8020/strategy/esmm/test_native/part-r-00000"
]
main
(
te_files
)
# rdd_te_files = spark.sparkContext.parallelize(te_files)
# print("-" * 100)
# indices = rdd_te_files.repartition(100).map(lambda x: main(x))
# # print(indices.take(1))
# print("dist predict native")
# te_result_dataframe = spark.createDataFrame(indices.flatMap(lambda x: x.split(";")).map(
# lambda l: Row(sample_id=l.split(":")[0],uid=l.split(":")[1],city=l.split(":")[2],cid_id=l.split(":")[3],ctcvr=l.split(":")[4])))
#
# # te_result_dataframe.show()
#
# te_result_dataframe.repartition(50).write.format("parquet").save(path=path+"native_result/",mode="overwrite")
print
(
"耗时(秒):"
)
print
((
time
.
time
()
-
b
))
...
...
eda/esmm/Model_pipline/train.py
View file @
c0e265ca
...
...
@@ -383,7 +383,7 @@ if __name__ == "__main__":
b
=
time
.
time
()
path
=
"hdfs://172.16.32.4:8020/strategy/esmm/"
tf
.
logging
.
set_verbosity
(
tf
.
logging
.
INFO
)
te_files
=
[
"hdfs://172.16.32.4:8020/strategy/esmm/test_n
earby
/part-r-00000"
]
te_files
=
[
"hdfs://172.16.32.4:8020/strategy/esmm/test_n
ative
/part-r-00000"
]
print
(
"hello up"
)
result
=
main
(
te_files
)
df
=
pd
.
DataFrame
(
result
,
columns
=
[
"sample_id"
,
"uid"
,
"city"
,
"cid_id"
,
"pctcvr"
])
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment