Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
ab7b7fd0
Commit
ab7b7fd0
authored
Apr 27, 2019
by
王志伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
跑波动原因数据
parent
c28cd871
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
0 additions
and
263 deletions
+0
-263
xgboost_test.scala
eda/feededa/src/main/scala/com/gmei/xgboost_test.scala
+0
-263
No files found.
eda/feededa/src/main/scala/com/gmei/xgboost_test.scala
deleted
100644 → 0
View file @
c28cd871
package
com.gmei
import
java.io.Serializable
import
com.gmei.WeafareStat.
{
defaultParams
,
parser
}
import
org.apache.spark.sql.
{
SaveMode
}
//import org.apache.spark.ml
import
org.apache.log4j.
{
Level
,
Logger
}
import
scopt.OptionParser
import
com.gmei.lib.AbstractParams
import
java.sql.
{
Connection
,
DriverManager
}
import
java.text.SimpleDateFormat
import
java.util.
{
Date
}
import
ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
import
org.apache.spark.ml.feature.
{
StringIndexer
,
VectorAssembler
}
import
org.apache.spark.sql.expressions.Window
import
org.apache.spark.sql.functions.row_number
import
org.apache.spark.sql.
{
SaveMode
,
SparkSession
}
import
org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import
org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import
org.apache.spark.sql.types.
{
DoubleType
,
StringType
,
StructField
,
StructType
}
//import org.apache.spark.ml.feature.StringIndexer
import
scala.collection.mutable.ArrayBuffer
object
xgboost_test
{
Logger
.
getLogger
(
"org.apache.spark"
).
setLevel
(
Level
.
WARN
)
Logger
.
getLogger
(
"org.apache.eclipse.jetty.server"
).
setLevel
(
Level
.
OFF
)
case
class
Params
(
env
:
String
=
"dev"
,
date
:
String
=
"2018-08-01"
)
extends
AbstractParams
[
Params
]
with
Serializable
val
defaultParams
=
Params
()
val
parser
=
new
OptionParser
[
Params
](
"Feed_EDA"
)
{
head
(
"WeafareStat"
)
opt
[
String
](
"env"
)
.
text
(
s
"the databases environment you used"
)
.
action
((
x
,
c
)
=>
c
.
copy
(
env
=
x
))
opt
[
String
]
(
"date"
)
.
text
(
s
"the date you used"
)
.
action
((
x
,
c
)
=>
c
.
copy
(
date
=
x
))
note
(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
"""
.
stripMargin
+
s
"| --env ${defaultParams.env}"
)
}
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
parser
.
parse
(
args
,
defaultParams
).
map
{
param
=>
GmeiConfig
.
setup
(
param
.
env
)
val
spark_env
=
GmeiConfig
.
getSparkSession
()
val
sc
=
spark_env
.
_2
sc
.
sql
(
"use eagle"
)
val
now
:
Date
=
new
Date
()
val
dateFormat
:
SimpleDateFormat
=
new
SimpleDateFormat
(
"yyyyMMdd"
)
val
date
=
dateFormat
.
format
(
now
.
getTime
-
86400000L
*
180
)
val
nowData
=
dateFormat
.
format
(
now
.
getTime
).
toInt
//点击美购
sc
.
sql
(
s
"""
|select device['device_id'] as device_id, params['service_id'] as service_id,
|city_id,
|partition_date
|from online.tl_hdfs_maidian_view
|WHERE action = 'goto_welfare_detail' and partition_date > '${date}'
"""
.
stripMargin
).
createTempView
(
"click"
)
//私信咨询
sc
.
sql
(
s
"""
|select device['device_id'] as device_id,params['service_id'] as service_id,city_id,partition_date
|from online.tl_hdfs_maidian_view
|WHERE action = 'welfare_detail_click_message' and partition_date > '${date}'
"""
.
stripMargin
).
createTempView
(
"message"
)
//电话咨询
sc
.
sql
(
s
"""
|select device['device_id'] as device_id,params['service_id'] as service_id,city_id,partition_date
|from online.tl_hdfs_maidian_view
|WHERE action = 'welfare_detail_click_phone' and partition_date > '${date}'
"""
.
stripMargin
).
createTempView
(
"phone"
)
val
meigou_click
=
sc
.
sql
(
"""
|select DISTINCT click.device_id,tag.tag_id,click.partition_date
|from click
|left join online.tl_meigou_service_view meigou
|on click.service_id = meigou.id
|left join eagle.tag_table tag
|on meigou.project_type_id = tag.id
|where tag.tag_id is not null
"""
.
stripMargin
).
rdd
.
map
(
row
=>{
var
result
=
((
""
,
""
),
0
)
if
((
nowData
-
row
(
2
).
toString
.
toInt
)
<
8
){
result
=
((
row
(
0
).
toString
,
row
(
1
).
toString
()),
20
)
}
else
if
((
nowData
-
row
(
2
).
toString
.
toInt
)
<
16
){
result
=
((
row
(
0
).
toString
,
row
(
1
).
toString
()),
5
)
}
else
{
result
=
((
row
(
0
).
toString
,
row
(
1
).
toString
()),
1
)
}
result
}).
map
(
row
=>{
(
row
.
_1
.
_1
.
toString
,
row
.
_1
.
_2
.
toString
,
row
.
_2
)
}).
toDF
(
"device_id"
,
"type"
,
"count"
)
meigou_click
.
show
()
val
t_click
=
meigou_click
.
select
(
"type"
)
.
map
(
row
=>
row
.
getAs
[
String
](
"type"
)).
distinct
().
collect
().
toList
meigou_click
.
groupBy
(
"device_id"
)
.
pivot
(
"type"
,
t_click
)
.
sum
(
"count"
)
.
na
.
fill
(
0
).
createTempView
(
"meigou_click"
)
val
temp
=
spark
.
sql
(
"""
|select *
|from meigou_click
"""
.
stripMargin
)
temp
.
show
()
sc
.
sql
(
"""
|select DISTINCT message.device_id,tag.tag_id,message.partition_date
|from message
|left join online.tl_meigou_service_view meigou
|on message.service_id = meigou.id
|left join eagle.tag_table tag
|on meigou.project_type_id = tag.id
|where tag.tag_id is not null
"""
.
stripMargin
).
rdd
.
map
(
row
=>{
((
row
(
0
).
toString
,
row
(
1
).
toString
()),
1
)
}).
reduceByKey
((
x
,
y
)
=>
x
+
y
).
map
(
row
=>{
(
row
.
_1
.
_1
,
row
.
_1
.
_2
,
row
.
_2
)
}).
toDF
(
"device_id"
,
"type"
,
"count"
).
createTempView
(
"meigou_message"
)
sc
.
sql
(
"""
|select DISTINCT phone.device_id,tag.tag_id,phone.partition_date
|from phone
|left join online.tl_meigou_service_view meigou
|on phone.service_id = meigou.id
|left join eagle.tag_table tag
|on meigou.project_type_id = tag.id
|where tag.tag_id is not null
"""
.
stripMargin
).
rdd
.
map
(
row
=>{
((
row
(
0
).
toString
,
row
(
1
).
toString
()),
1
)
}).
reduceByKey
((
x
,
y
)
=>
x
+
y
).
map
(
row
=>{
(
row
.
_1
.
_1
,
row
.
_1
.
_2
,
row
.
_2
)
}).
toDF
(
"device_id"
,
"type"
,
"count"
).
createTempView
(
"meigou_phone"
)
val
test_table_ml
=
sc
.
sql
(
"""
|select click.*,message.type as message, phone.type as phone from meigou_click click
|left join meigou_message message on click.device_id = message.device_id
|left join meigou_phone phone on click.device_id = phone.device_id
"""
.
stripMargin
)
test_table_ml
.
show
()
val
df
=
test_table_ml
.
where
(
"message is not null"
)
.
where
(
"message = phone"
)
.
drop
(
"device_id"
,
"phone"
)
println
(
"去重之前训练集数据:"
,
df
.
count
())
val
df2
=
df
.
dropDuplicates
(
"7"
,
"11"
,
"3"
,
"5"
,
"971"
,
"6933"
,
"929"
,
"992"
,
"922"
,
"9"
,
"1"
,
"1024"
,
"10"
,
"2214"
,
"4"
,
"12"
,
"13"
,
"2"
,
"2054"
)
df2
.
show
()
println
(
"去重之后训练集数据:"
,
df2
.
count
())
val
num_class
=
df2
.
select
(
"message"
).
distinct
().
count
()
println
(
"标签数据:"
,
num_class
)
val
stringIndexer
=
new
StringIndexer
().
setInputCol
(
"message"
).
setOutputCol
(
"classIndex"
).
fit
(
df2
)
val
labelTransformed
=
stringIndexer
.
transform
(
df2
)
val
features
=
new
ArrayBuffer
[
String
]()
labelTransformed
.
schema
.
foreach
(
r
=>(
features
+=
r
.
name
.
toString
))
features
-=
"classIndex"
features
-=
"message"
val
label_index
=
labelTransformed
.
select
(
"classIndex"
,
"message"
).
distinct
()
label_index
.
createTempView
(
"label_index"
)
val
vectorAssembler
=
new
VectorAssembler
().
setInputCols
(
features
.
toArray
).
setOutputCol
(
"features"
)
val
xgbInput
=
vectorAssembler
.
transform
(
labelTransformed
).
select
(
"features"
,
"classIndex"
)
val
xgbParam
=
Map
(
"eta"
->
0.3f
,
"max_depth"
->
3
,
"objective"
->
"multi:softprob"
,
"num_class"
->
num_class
,
"num_round"
->
100
,
"eval_Metric"
->
"merror"
,
"num_workers"
->
7
,
"num_early_stopping_rounds"
->
5
,
"maximize_evaluation_metrics"
->
"True"
)
val
xgbClassifier
=
new
XGBoostClassifier
(
xgbParam
).
setFeaturesCol
(
"features"
).
setLabelCol
(
"classIndex"
)
val
Array
(
training
,
test
)
=
xgbInput
.
randomSplit
(
Array
(
0.8
,
0.2
))
val
xgbClassificationModel
=
xgbClassifier
.
fit
(
training
)
val
evaluatorResult
=
xgbClassificationModel
.
transform
(
test
)
val
evaluator
=
new
MulticlassClassificationEvaluator
()
.
setLabelCol
(
"classIndex"
)
.
setPredictionCol
(
"prediction"
)
val
accuracy
=
evaluator
.
setMetricName
(
"accuracy"
).
evaluate
(
evaluatorResult
)
val
weightedPrecision
=
evaluator
.
setMetricName
(
"weightedPrecision"
).
evaluate
(
evaluatorResult
)
val
weightedRecall
=
evaluator
.
setMetricName
(
"weightedRecall"
).
evaluate
(
evaluatorResult
)
val
f1
=
evaluator
.
setMetricName
(
"f1"
).
evaluate
(
evaluatorResult
)
// val auc=evaluator.setMetricName("areaUnderROC").evaluate(evaluatorResult)
println
(
"================================="
)
println
(
accuracy
)
println
(
weightedPrecision
)
println
(
weightedRecall
)
println
(
f1
)
// println(auc)
println
(
"================================="
)
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment