package com.gmei
import java.io.Serializable
import java.time.LocalDate
import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scopt.OptionParser
import scala.util.parsing.json.JSON
object esmm_feature {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
user_feature(sc)
get_applist(sc)
sc.stop()
}}
def get_applist(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday)
val df = spark.sql(
s"""
|select device["device_id"] as device_id,cl_type,params["installed_app_info"]
|from online.tl_hdfs_maidian_view where partition_date = $yesterday
|and action = 'user_installed_all_app_info'
""".stripMargin).dropDuplicates("device_id")
df.persist()
val old = spark.sql("select device_id from jerry_test.device_app_list").collect().map(x => x(0).toString)
import spark.implicits._
val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
.filter(x => x._2 == "android").map(x => (x._1,x._2,parse_json(x._3),yesterday))
val ios = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
.filter(x => x._2 == "ios").map(x => (x._1,x._2,x._3,yesterday))
val rdd = android.union(ios)
val new_user = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","os","app_list","update_date")
if (new_user.take(1).nonEmpty){
val tecent_jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append)
}else{
println("没有新用户需要写入")
}
df.unpersist()
}
def parse_json(str:String): String ={
var t = List[Map[String, Any]]()
val result = JSON.parseFull(str)
result match {
case Some(b: List[Map[String, Any]]) => t = t ++ b
case None => println("Parsing failed")
case other => println("Unknown data structure: " + other)
}
var x = List[String]()
if (t.nonEmpty){
for (i <- t){
x = x:+i("appName").toString
}
}
x.mkString(",")
}
def user_feature(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday)
val sql_exist = "select device_id from jerry_test.user_feature"
val old = spark.sql(sql_exist)
.collect().map(x => x(0).toString)
val sql_yesterday =
s"""
|select device["device_id"] as id,device["device_type"],device["manufacturer"],city_id,channel,
|partition_date from online.tl_hdfs_maidian_view where partition_date = $yesterday
""".stripMargin
val rdd = spark.sql(sql_yesterday).repartition(200).na.drop().dropDuplicates("id").rdd
.map(x =>(x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString))
import spark.implicits._
val df_new = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","device_type","manufacturer","city_id","channel","date")
if (df_new.take(1).nonEmpty){
val tecent_jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append)
}else {
println("no need to insert into user feature")
}
}
}