Commit 96d74920 authored by 王志伟's avatar 王志伟

跑波动原因数据

parent de2a8606
......@@ -957,75 +957,78 @@ object find_reason {
//2.当天新用户中的点击用户数
// val new_clk_count = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(distinct(oc.device_id)) as new_clk_count
// |from all_clk_diary_card oc inner join device_id_new
// |on oc.device_id = device_id_new.device_id
// """.stripMargin
// )
////2.1 有点击的新用户
// val new_clk_device = sc.sql(
// s"""
// |select distinct(oc.device_id) as device_id
// |from all_clk_diary_card oc inner join device_id_new
// |on oc.device_id = device_id_new.device_id
// """.stripMargin
// )
// new_clk_device.createOrReplaceTempView("new_clk_device")
//
//
// //3.当天老用户数
//
// val old_count = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(distinct(dio.device_id)) as old_count
// |from device_id_old dio left join agency_id
// |on dio.device_id = agency_id.device_id
// |where agency_id.device_id is null
// """.stripMargin
// )
//
// //4.当天新用户数
// val new_count = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(distinct(din.device_id)) as new_count
// |from device_id_new din left join agency_id
// |on din.device_id = agency_id.device_id
// |where agency_id.device_id is null
// """.stripMargin
// )
//
// //5.有点击老用户的曝光数
// val exp_clkold_count = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(dp.device_id) as imp_clkold_count
// |from data_feed_exposure_precise dp inner join old_clk_device
// |on dp.device_id = old_clk_device.device_id
// |where stat_date='${stat_date}'
// |group by stat_date
// """.stripMargin
// )
//
// //6.有点击新用户的曝光数
// val exp_clknew_count = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(dp.device_id) as imp_clknew_count
// |from data_feed_exposure_precise dp inner join new_clk_device
// |on dp.device_id = new_clk_device.device_id
// |where stat_date='${stat_date}'
// |group by stat_date
// """.stripMargin
// )
//
// val result = old_clk_count.join(new_clk_count,"stat_date")
// .join(old_count,"stat_date")
// .join(new_count,"stat_date")
// .join(exp_clkold_count,"stat_date")
// .join(exp_clknew_count,"stat_date")
//
val new_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(distinct(oc.device_id)) as new_clk_count
|from all_clk_diary_card oc inner join device_id_new
|on oc.device_id = device_id_new.device_id
""".stripMargin
)
//2.1 有点击的新用户
val new_clk_device = sc.sql(
s"""
|select distinct(oc.device_id) as device_id
|from all_clk_diary_card oc inner join device_id_new
|on oc.device_id = device_id_new.device_id
""".stripMargin
)
new_clk_device.createOrReplaceTempView("new_clk_device")
//3.当天老用户数
val old_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(distinct(dio.device_id)) as old_count
|from device_id_old dio left join agency_id
|on dio.device_id = agency_id.device_id
|where agency_id.device_id is null
""".stripMargin
)
//4.当天新用户数
val new_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(distinct(din.device_id)) as new_count
|from device_id_new din left join agency_id
|on din.device_id = agency_id.device_id
|where agency_id.device_id is null
""".stripMargin
)
//5.有点击老用户的曝光数
val exp_clkold_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(dp.device_id) as imp_clkold_count
|from data_feed_exposure_precise dp inner join old_clk_device
|on dp.device_id = old_clk_device.device_id
|where stat_date='${stat_date}'
|group by stat_date
""".stripMargin
)
//6.有点击新用户的曝光数
val exp_clknew_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(dp.device_id) as imp_clknew_count
|from data_feed_exposure_precise dp inner join new_clk_device
|on dp.device_id = new_clk_device.device_id
|where stat_date='${stat_date}'
|group by stat_date
""".stripMargin
)
val result = old_clk_count.join(new_clk_count,"stat_date")
.join(old_count,"stat_date")
.join(new_count,"stat_date")
.join(exp_clkold_count,"stat_date")
.join(exp_clknew_count,"stat_date")
// GmeiConfig.writeToJDBCTable(result, "device_clk_imp_reason", SaveMode.Append)
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="device_clk_imp_reason",SaveMode.Append)
println("写入完成")
}
......
package com.gmei
import java.io.Serializable
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{SaveMode}
//import org.apache.spark.ml
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import java.sql.{Connection, DriverManager}
import java.text.SimpleDateFormat
import java.util.{Date}
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
//import org.apache.spark.ml.feature.StringIndexer
import scala.collection.mutable.ArrayBuffer
object xgboost_test {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
sc.sql("use eagle")
val now: Date = new Date()
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
val date = dateFormat.format(now.getTime - 86400000L * 180)
val nowData = dateFormat.format(now.getTime).toInt
//点击美购
sc.sql(
s"""
|select device['device_id'] as device_id, params['service_id'] as service_id,
|city_id,
|partition_date
|from online.tl_hdfs_maidian_view
|WHERE action = 'goto_welfare_detail' and partition_date > '${date}'
""".stripMargin).createTempView("click")
//私信咨询
sc.sql(
s"""
|select device['device_id'] as device_id,params['service_id'] as service_id,city_id,partition_date
|from online.tl_hdfs_maidian_view
|WHERE action = 'welfare_detail_click_message' and partition_date > '${date}'
""".stripMargin).createTempView("message")
//电话咨询
sc.sql(
s"""
|select device['device_id'] as device_id,params['service_id'] as service_id,city_id,partition_date
|from online.tl_hdfs_maidian_view
|WHERE action = 'welfare_detail_click_phone' and partition_date > '${date}'
""".stripMargin).createTempView("phone")
val meigou_click = sc.sql(
"""
|select DISTINCT click.device_id,tag.tag_id,click.partition_date
|from click
|left join online.tl_meigou_service_view meigou
|on click.service_id = meigou.id
|left join eagle.tag_table tag
|on meigou.project_type_id = tag.id
|where tag.tag_id is not null
""".stripMargin).rdd.map(row =>{
var result = (("",""),0)
if((nowData - row(2).toString.toInt) < 8){
result = ((row(0).toString,row(1).toString()),20)
}else if ((nowData - row(2).toString.toInt) < 16){
result = ((row(0).toString,row(1).toString()),5)
}else{
result = ((row(0).toString,row(1).toString()),1)
}
result
}).map(row =>{
(row._1._1.toString,row._1._2.toString,row._2)
}).toDF("device_id","type","count")
meigou_click.show()
val t_click = meigou_click.select("type")
.map(row => row.getAs[String]("type")).distinct().collect().toList
meigou_click.groupBy("device_id")
.pivot("type",t_click)
.sum("count")
.na.fill(0).createTempView("meigou_click")
val temp=spark.sql(
"""
|select *
|from meigou_click
""".stripMargin
)
temp.show()
sc.sql(
"""
|select DISTINCT message.device_id,tag.tag_id,message.partition_date
|from message
|left join online.tl_meigou_service_view meigou
|on message.service_id = meigou.id
|left join eagle.tag_table tag
|on meigou.project_type_id = tag.id
|where tag.tag_id is not null
""".stripMargin).rdd.map(row =>{
((row(0).toString,row(1).toString()),1)
}).reduceByKey((x,y) => x+y).map(row =>{
(row._1._1,row._1._2,row._2)
}).toDF("device_id","type","count").createTempView("meigou_message")
sc.sql(
"""
|select DISTINCT phone.device_id,tag.tag_id,phone.partition_date
|from phone
|left join online.tl_meigou_service_view meigou
|on phone.service_id = meigou.id
|left join eagle.tag_table tag
|on meigou.project_type_id = tag.id
|where tag.tag_id is not null
""".stripMargin).rdd.map(row =>{
((row(0).toString,row(1).toString()),1)
}).reduceByKey((x,y) => x+y).map(row =>{
(row._1._1,row._1._2,row._2)
}).toDF("device_id","type","count").createTempView("meigou_phone")
val test_table_ml = sc.sql(
"""
|select click.*,message.type as message, phone.type as phone from meigou_click click
|left join meigou_message message on click.device_id = message.device_id
|left join meigou_phone phone on click.device_id = phone.device_id
""".stripMargin)
test_table_ml.show()
val df = test_table_ml
.where("message is not null")
.where("message = phone")
.drop("device_id","phone")
println("去重之前训练集数据:",df.count())
val df2=df.dropDuplicates("7","11","3","5","971","6933","929","992","922","9","1","1024","10","2214","4","12","13","2","2054")
df2.show()
println("去重之后训练集数据:",df2.count())
val num_class = df2.select("message").distinct().count()
println("标签数据:",num_class)
val stringIndexer = new StringIndexer().
setInputCol("message").
setOutputCol("classIndex").fit(df2)
val labelTransformed = stringIndexer.transform(df2)
val features = new ArrayBuffer[String]()
labelTransformed.schema.foreach(r =>(features += r.name.toString))
features -= "classIndex"
features -= "message"
val label_index = labelTransformed.select("classIndex","message").distinct()
label_index.createTempView("label_index")
val vectorAssembler = new VectorAssembler().
setInputCols(features.toArray).
setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed).select("features","classIndex")
val xgbParam = Map("eta" -> 0.3f,
"max_depth" -> 3,
"objective" -> "multi:softprob",
"num_class" -> num_class,
"num_round" -> 100,
"eval_Metric"->"merror",
"num_workers" -> 7,
"num_early_stopping_rounds"-> 5,
"maximize_evaluation_metrics"->"True")
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol("features").
setLabelCol("classIndex")
val Array(training, test) = xgbInput.randomSplit(Array(0.8, 0.2))
val xgbClassificationModel = xgbClassifier.fit(training)
val evaluatorResult = xgbClassificationModel.transform(test)
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("classIndex")
.setPredictionCol("prediction")
val accuracy =evaluator.setMetricName("accuracy").evaluate(evaluatorResult)
val weightedPrecision=evaluator.setMetricName("weightedPrecision").evaluate(evaluatorResult)
val weightedRecall=evaluator.setMetricName("weightedRecall").evaluate(evaluatorResult)
val f1=evaluator.setMetricName("f1").evaluate(evaluatorResult)
// val auc=evaluator.setMetricName("areaUnderROC").evaluate(evaluatorResult)
println("=================================")
println(accuracy)
println(weightedPrecision)
println(weightedRecall)
println(f1)
// println(auc)
println("=================================")
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment