Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
8cdf8d69
Commit
8cdf8d69
authored
Dec 05, 2018
by
王志伟
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' of
http://git.wanmeizhensuo.com/ML/ffm-baseline
parents
838c47ab
700d7aa9
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
218 additions
and
0 deletions
+218
-0
Data2FFM.scala
eda/feededa/src/main/scala/com/gmei/Data2FFM.scala
+218
-0
No files found.
eda/feededa/src/main/scala/com/gmei/Data2FFM.scala
0 → 100644
View file @
8cdf8d69
package
com.gmei
import
java.io.
{
File
,
PrintWriter
,
Serializable
}
import
com.gmei.GmeiConfig
import
com.gmei.lib.AbstractParams
import
org.apache.log4j.
{
Level
,
Logger
}
import
org.apache.spark.sql.
{
DataFrame
,
SaveMode
,
TiContext
}
import
scopt.OptionParser
import
scala.util.Random
object
Data2FFM
{
Logger
.
getLogger
(
"org.apache.spark"
).
setLevel
(
Level
.
WARN
)
Logger
.
getLogger
(
"org.apache.eclipse.jetty.server"
).
setLevel
(
Level
.
OFF
)
case
class
Params
(
env
:
String
=
"dev"
)
extends
AbstractParams
[
Params
]
with
Serializable
val
defaultParams
=
Params
()
val
parser
=
new
OptionParser
[
Params
](
"Feed_EDA"
)
{
head
(
"EsmmData"
)
opt
[
String
](
"env"
)
.
text
(
s
"the databases environment you used"
)
.
action
((
x
,
c
)
=>
c
.
copy
(
env
=
x
))
note
(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
"""
.
stripMargin
+
s
"| --env ${defaultParams.env}"
)
}
def
writecsv
(
path
:
String
,
str
:
String
)
:
Unit
={
val
writer
=
new
PrintWriter
(
new
File
(
path
))
writer
.
write
(
str
)
writer
.
close
()
println
(
"写入成功"
)
}
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
parser
.
parse
(
args
,
defaultParams
).
map
{
param
=>
GmeiConfig
.
setup
(
param
.
env
)
val
spark_env
=
GmeiConfig
.
getSparkSession
()
val
sc
=
spark_env
.
_2
val
ti
=
new
TiContext
(
sc
)
ti
.
tidbMapTable
(
dbName
=
"jerry_test"
,
tableName
=
"esmm_train_data"
)
ti
.
tidbMapTable
(
dbName
=
"jerry_test"
,
tableName
=
"esmm_pre_data"
)
val
yesteday_have_seq
=
GmeiConfig
.
getMinusNDate
(
5
)
val
esmm_data
=
sc
.
sql
(
s
"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
|from esmm_train_data
"""
.
stripMargin
).
na
.
drop
()
val
column_list
=
esmm_data
.
columns
val
max_stat_date
=
sc
.
sql
(
s
"""
|select max(stat_date) from esmm_train_data
"""
.
stripMargin
)
println
(
"------------------------"
)
val
max_stat_date_str
=
max_stat_date
.
collect
().
map
(
s
=>
s
(
0
).
toString
).
head
println
(
max_stat_date_str
)
println
(
column_list
.
slice
(
0
,
2
).
toList
)
val
column_number
=
scala
.
collection
.
mutable
.
Map
[
String
,
Array
[
String
]]()
for
(
i
<-
column_list
){
column_number
(
i
)
=
esmm_data
.
select
(
i
).
distinct
().
collect
().
map
(
x
=>
x
(
0
).
toString
)
}
println
(
"dict"
)
val
rdd
=
esmm_data
.
rdd
.
repartition
(
200
)
.
map
(
x
=>
(
x
(
0
).
toString
,
x
(
1
).
toString
,
x
(
2
).
toString
,
x
(
3
).
toString
,
x
(
4
).
toString
,
x
(
5
).
toString
,
x
(
6
).
toString
,
x
(
7
).
toString
,
x
(
8
).
toString
,
x
(
9
).
toString
,
x
(
10
).
toString
))
rdd
.
persist
()
import
sc.implicits._
val
train
=
rdd
.
filter
(
x
=>
x
.
_4
!=
max_stat_date_str
)
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
,
x
.
_3
,
column_number
(
"stat_date"
).
indexOf
(
x
.
_4
),
column_number
(
"ucity_id"
).
indexOf
(
x
.
_5
),
column_number
(
"cid_id"
).
indexOf
(
x
.
_6
),
column_number
(
"diary_service_id"
).
indexOf
(
x
.
_7
),
column_number
(
"clevel1_id"
).
indexOf
(
x
.
_8
),
column_number
(
"slevel1_id"
).
indexOf
(
x
.
_9
),
column_number
(
"ccity_name"
).
indexOf
(
x
.
_10
),
column_number
(
"scity_id"
).
indexOf
(
x
.
_11
)))
.
map
(
x
=>
((
new
util
.
Random
).
nextInt
(
2147483647
),
x
.
_2
,
x
.
_3
,
"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0"
.
format
(
x
.
_4
,
x
.
_5
,
x
.
_6
,
x
.
_7
,
x
.
_8
,
x
.
_9
,
x
.
_10
,
x
.
_11
))).
zipWithIndex
()
.
map
(
x
=>
(
x
.
_1
.
_1
,
x
.
_2
,
x
.
_1
.
_2
,
x
.
_1
.
_3
,
x
.
_1
.
_4
))
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
+
","
+
x
.
_3
+
","
+
x
.
_4
+
","
+
x
.
_5
)).
toDF
(
"number"
,
"data"
)
val
jdbcuri
=
"jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig
.
writeToJDBCTable
(
jdbcuri
,
train
,
"esmm_data2ffm_train"
,
SaveMode
.
Overwrite
)
val
test
=
rdd
.
filter
(
x
=>
x
.
_4
==
max_stat_date_str
)
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
,
x
.
_3
,
column_number
(
"stat_date"
).
indexOf
(
x
.
_4
),
column_number
(
"ucity_id"
).
indexOf
(
x
.
_5
),
column_number
(
"cid_id"
).
indexOf
(
x
.
_6
),
column_number
(
"diary_service_id"
).
indexOf
(
x
.
_7
),
column_number
(
"clevel1_id"
).
indexOf
(
x
.
_8
),
column_number
(
"slevel1_id"
).
indexOf
(
x
.
_9
),
column_number
(
"ccity_name"
).
indexOf
(
x
.
_10
),
column_number
(
"scity_id"
).
indexOf
(
x
.
_11
)))
.
map
(
x
=>
((
new
util
.
Random
).
nextInt
(
2147483647
),
x
.
_2
,
x
.
_3
,
"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0"
.
format
(
x
.
_4
,
x
.
_5
,
x
.
_6
,
x
.
_7
,
x
.
_8
,
x
.
_9
,
x
.
_10
,
x
.
_11
))).
zipWithIndex
()
.
map
(
x
=>
(
x
.
_1
.
_1
,
x
.
_2
,
x
.
_1
.
_2
,
x
.
_1
.
_3
,
x
.
_1
.
_4
))
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
+
","
+
x
.
_3
+
","
+
x
.
_4
+
","
+
x
.
_5
)).
toDF
(
"number"
,
"data"
)
GmeiConfig
.
writeToJDBCTable
(
jdbcuri
,
test
,
"esmm_data2ffm_cv"
,
SaveMode
.
Overwrite
)
sc
.
stop
()
}
}
}
object
Data2FFMInfer
{
Logger
.
getLogger
(
"org.apache.spark"
).
setLevel
(
Level
.
WARN
)
Logger
.
getLogger
(
"org.apache.eclipse.jetty.server"
).
setLevel
(
Level
.
OFF
)
case
class
Params
(
env
:
String
=
"dev"
)
extends
AbstractParams
[
Params
]
with
Serializable
val
defaultParams
=
Params
()
val
parser
=
new
OptionParser
[
Params
](
"Feed_EDA"
)
{
head
(
"EsmmData"
)
opt
[
String
](
"env"
)
.
text
(
s
"the databases environment you used"
)
.
action
((
x
,
c
)
=>
c
.
copy
(
env
=
x
))
note
(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
"""
.
stripMargin
+
s
"| --env ${defaultParams.env}"
)
}
def
writecsv
(
path
:
String
,
str
:
String
)
:
Unit
={
val
writer
=
new
PrintWriter
(
new
File
(
path
))
writer
.
write
(
str
)
writer
.
close
()
println
(
"写入成功"
)
}
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
parser
.
parse
(
args
,
defaultParams
).
map
{
param
=>
GmeiConfig
.
setup
(
param
.
env
)
val
spark_env
=
GmeiConfig
.
getSparkSession
()
val
sc
=
spark_env
.
_2
val
ti
=
new
TiContext
(
sc
)
ti
.
tidbMapTable
(
dbName
=
"jerry_test"
,
tableName
=
"esmm_train_data"
)
ti
.
tidbMapTable
(
dbName
=
"jerry_test"
,
tableName
=
"esmm_pre_data"
)
val
yesteday_have_seq
=
GmeiConfig
.
getMinusNDate
(
5
)
val
esmm_data
=
sc
.
sql
(
s
"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
|from esmm_pre_data
"""
.
stripMargin
).
na
.
drop
()
val
column_list
=
esmm_data
.
columns
println
(
column_list
.
slice
(
0
,
2
).
toList
)
val
column_number
=
scala
.
collection
.
mutable
.
Map
[
String
,
Array
[
String
]]()
for
(
i
<-
column_list
){
column_number
(
i
)
=
esmm_data
.
select
(
i
).
distinct
().
collect
().
map
(
x
=>
x
(
0
).
toString
)
}
println
(
"dict"
)
val
rdd
=
esmm_data
.
rdd
.
repartition
(
200
)
.
map
(
x
=>
(
x
(
0
).
toString
,
x
(
1
).
toString
,
x
(
2
).
toString
,
x
(
3
).
toString
,
x
(
4
).
toString
,
x
(
5
).
toString
,
x
(
6
).
toString
,
x
(
7
).
toString
,
x
(
8
).
toString
,
x
(
9
).
toString
,
x
(
10
).
toString
))
rdd
.
persist
()
import
sc.implicits._
val
jdbcuri
=
"jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
val
test
=
rdd
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
,
x
.
_3
,
column_number
(
"stat_date"
).
indexOf
(
x
.
_4
),
column_number
(
"ucity_id"
).
indexOf
(
x
.
_5
),
column_number
(
"cid_id"
).
indexOf
(
x
.
_6
),
column_number
(
"diary_service_id"
).
indexOf
(
x
.
_7
),
column_number
(
"clevel1_id"
).
indexOf
(
x
.
_8
),
column_number
(
"slevel1_id"
).
indexOf
(
x
.
_9
),
column_number
(
"ccity_name"
).
indexOf
(
x
.
_10
),
column_number
(
"scity_id"
).
indexOf
(
x
.
_11
)))
.
map
(
x
=>
((
new
util
.
Random
).
nextInt
(
2147483647
),
x
.
_2
,
x
.
_3
,
"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0"
.
format
(
x
.
_4
,
x
.
_5
,
x
.
_6
,
x
.
_7
,
x
.
_8
,
x
.
_9
,
x
.
_10
,
x
.
_11
))).
zipWithIndex
()
.
map
(
x
=>
(
x
.
_1
.
_1
,
x
.
_2
,
x
.
_1
.
_2
,
x
.
_1
.
_3
,
x
.
_1
.
_4
))
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
+
","
+
x
.
_3
+
","
+
x
.
_4
+
","
+
x
.
_5
)).
toDF
(
"number"
,
"data"
)
GmeiConfig
.
writeToJDBCTable
(
jdbcuri
,
test
,
"esmm_data2ffm_infer"
,
SaveMode
.
Overwrite
)
sc
.
stop
()
}
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment