package com.gmei import java.io.Serializable import java.time.LocalDate import com.gmei.lib.AbstractParams import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import scopt.OptionParser import scala.util.parsing.json.JSON object esmm_feature { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 user_feature(sc) get_applist(sc) sc.stop() }} def get_applist(spark:SparkSession): Unit ={ val yesterday = LocalDate.now().minusDays(1).toString.replace("-","") println(yesterday) val df = spark.sql( s""" |select device["device_id"] as device_id,cl_type,params["installed_app_info"] |from online.tl_hdfs_maidian_view where partition_date = $yesterday |and action = 'user_installed_all_app_info' """.stripMargin).dropDuplicates("device_id") df.persist() val old = spark.sql("select device_id from jerry_test.device_app_list").collect().map(x => x(0).toString) import spark.implicits._ val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString)) .filter(x => x._2 == "android").map(x => (x._1,x._2,parse_json(x._3),yesterday)) val ios = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString)) .filter(x => x._2 == "ios").map(x => (x._1,x._2,x._3,yesterday)) val rdd = android.union(ios) val new_user = rdd.filter(x => old.indexOf(x._1)== -1) .toDF("device_id","os","app_list","update_date") if (new_user.take(1).nonEmpty){ val tecent_jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append) }else{ println("没有新用户需要写入") } df.unpersist() } def parse_json(str:String): String ={ var t = List[Map[String, Any]]() val result = JSON.parseFull(str) result match { case Some(b: List[Map[String, Any]]) => t = t ++ b case None => println("Parsing failed") case other => println("Unknown data structure: " + other) } var x = List[String]() if (t.nonEmpty){ for (i <- t){ x = x:+i("appName").toString } } x.mkString(",") } def user_feature(spark:SparkSession): Unit ={ val yesterday = LocalDate.now().minusDays(1).toString.replace("-","") println(yesterday) val sql_exist = "select device_id from jerry_test.user_feature" val old = spark.sql(sql_exist) .collect().map(x => x(0).toString) val sql_yesterday = s""" |select device["device_id"] as id,device["device_type"],device["manufacturer"],city_id,channel, |partition_date from online.tl_hdfs_maidian_view where partition_date = $yesterday """.stripMargin val rdd = spark.sql(sql_yesterday).repartition(200).na.drop().dropDuplicates("id").rdd .map(x =>(x(0).toString,x(1).toString,x(2).toString,x(3).toString, x(4).toString,x(5).toString)) import spark.implicits._ val df_new = rdd.filter(x => old.indexOf(x._1)== -1) .toDF("device_id","device_type","manufacturer","city_id","channel","date") if (df_new.take(1).nonEmpty){ val tecent_jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append) }else { println("no need to insert into user feature") } } }