Commit 06e2ea5a authored by 张彦钊's avatar 张彦钊

修改esmm测试项目的预测集sql

parent 72bbad23
package com.gmei
import java.io.Serializable
import java.time.LocalDate
import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext}
import scopt.OptionParser
import scala.util.parsing.json.JSON
object esmm_app_list {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val yesterday = LocalDate.now().minusDays(118).toString.replace("-","")
println(yesterday)
get_applist(sc,yesterday)
sc.stop()
}}
def get_applist(spark:SparkSession,yesterday:String): Unit ={
val df = spark.sql(
s"""
|select device["device_id"] as device_id,cl_type,params["installed_app_info"]
|from online.tl_hdfs_maidian_view where partition_date = $yesterday
|and action = 'user_installed_all_app_info'
""".stripMargin).dropDuplicates("device_id")
df.persist()
val ti = new TiContext(spark)
ti.tidbMapTable(dbName = "jerry_test",tableName = "device_app_list")
val old = spark.sql("select device_id from device_app_list").collect().map(x => x(0).toString)
import spark.implicits._
val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
.filter(x => x._2 == "android").map(x => (x._1,x._2,parse_json(x._3),yesterday))
val ios = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
.filter(x => x._2 == "ios").map(x => (x._1,x._2,x._3,yesterday))
val rdd = android.union(ios)
val new_user = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","os","app_list","update_date")
if (new_user.take(1).nonEmpty){
val jdbc = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbc, new_user,"device_app_list", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append)
}else{
println("没有新用户需要写入")
}
df.unpersist()
}
def parse_json(str:String): String ={
var t = List[Map[String, Any]]()
val result = JSON.parseFull(str)
result match {
case Some(b: List[Map[String, Any]]) => t = t ++ b
case None => println("Parsing failed")
case other => println("Unknown data structure: " + other)
}
var x = List[String]()
if (t.nonEmpty){
for (i <- t){
x = x:+i("appName").toString
}
}
x.mkString(",")
}
}
...@@ -143,7 +143,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map): ...@@ -143,7 +143,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
"left join cid_type_top c on e.device_id = c.device_id " \ "left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \ "left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \ "left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id" "left join diary_feat feat on e.cid_id = feat.diary_id limit 600"
df = con_sql(db, sql) df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",10: "device_id", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top",10: "device_id",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment