Commit e92a8969 authored by 张彦钊's avatar 张彦钊

增加esmm feature文件

parent b3194a12
......@@ -12,7 +12,7 @@ import scopt.OptionParser
import scala.util.parsing.json.JSON
object esmm_app_list {
object esmm_feature {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
......@@ -46,15 +46,19 @@ object esmm_app_list {
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_test",tableName = "device_app_list")
ti.tidbMapTable(dbName = "jerry_test",tableName = "user_feature")
val yesterday = LocalDate.now().minusDays(118).toString.replace("-","")
println(yesterday)
user_feature(sc)
get_applist(sc)
get_applist(sc,yesterday)
sc.stop()
}}
def get_applist(spark:SparkSession,yesterday:String): Unit ={
def get_applist(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday)
val df = spark.sql(
s"""
|select device["device_id"] as device_id,cl_type,params["installed_app_info"]
......@@ -62,8 +66,7 @@ object esmm_app_list {
|and action = 'user_installed_all_app_info'
""".stripMargin).dropDuplicates("device_id")
df.persist()
val ti = new TiContext(spark)
ti.tidbMapTable(dbName = "jerry_test",tableName = "device_app_list")
val old = spark.sql("select device_id from device_app_list").collect().map(x => x(0).toString)
import spark.implicits._
......@@ -108,5 +111,34 @@ object esmm_app_list {
}
x.mkString(",")
}
def user_feature(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday)
val sql_exist = "select device_id from user_feature"
val old = spark.sql(sql_exist)
.collect().map(x => x(0).toString)
val sql_yesterday =
s"""
|select device["device_id"] as id,device["device_type"],device["manufacturer"],city_id,channel,
|partition_date from online.tl_hdfs_maidian_view where partition_date = $yesterday
""".stripMargin
val rdd = spark.sql(sql_yesterday).repartition(200).na.drop().dropDuplicates("id").rdd
.map(x =>(x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString))
import spark.implicits._
val df_new = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","device_type","manufacturer","city_id","channel","date")
if (df_new.take(1).nonEmpty){
df_new.persist()
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, df_new, "user_feature", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append)
df_new.unpersist()
}else {
println("no need to insert into user feature")
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment