package com.gmei


import java.io.Serializable
import java.time.LocalDate

import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scopt.OptionParser

import scala.util.parsing.json.JSON


object esmm_feature {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      user_feature(sc)
      get_applist(sc)

    sc.stop()
  }}

  def get_applist(spark:SparkSession): Unit ={
    val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
    println(yesterday)
    val df = spark.sql(
      s"""
         |select device["device_id"] as device_id,cl_type,params["installed_app_info"]
         |from online.tl_hdfs_maidian_view where partition_date = $yesterday
         |and action = 'user_installed_all_app_info'
       """.stripMargin).dropDuplicates("device_id")
    df.persist()

    val old = spark.sql("select device_id from jerry_test.device_app_list").collect().map(x => x(0).toString)

    import spark.implicits._
    val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
      .filter(x => x._2 == "android").map(x => (x._1,x._2,parse_json(x._3),yesterday))

    val ios = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
      .filter(x => x._2 == "ios").map(x => (x._1,x._2,x._3,yesterday))

    val rdd = android.union(ios)

    val new_user = rdd.filter(x => old.indexOf(x._1)== -1)
      .toDF("device_id","os","app_list","update_date")
    if (new_user.take(1).nonEmpty){
      val tecent_jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
      GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append)

    }else{
      println("没有新用户需要写入")
    }
    df.unpersist()




  }
  def parse_json(str:String): String ={
    var t = List[Map[String, Any]]()
    val result = JSON.parseFull(str)
    result match {
      case Some(b: List[Map[String, Any]]) => t = t ++ b
      case None => println("Parsing failed")
      case other => println("Unknown data structure: " + other)
    }
    var x = List[String]()
    if (t.nonEmpty){
      for (i <- t){
        x = x:+i("appName").toString
      }
    }
    x.mkString(",")
  }
  def user_feature(spark:SparkSession): Unit ={
    val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
    println(yesterday)
    val sql_exist = "select device_id from jerry_test.user_feature"
    val old = spark.sql(sql_exist)
      .collect().map(x => x(0).toString)
    val sql_yesterday =
      s"""
         |select device["device_id"] as id,device["device_type"],device["manufacturer"],city_id,channel,
         |partition_date from online.tl_hdfs_maidian_view where partition_date = $yesterday
       """.stripMargin

    val rdd = spark.sql(sql_yesterday).repartition(200).na.drop().dropDuplicates("id").rdd
      .map(x =>(x(0).toString,x(1).toString,x(2).toString,x(3).toString,
        x(4).toString,x(5).toString))
    import spark.implicits._
    val df_new = rdd.filter(x => old.indexOf(x._1)== -1)
      .toDF("device_id","device_type","manufacturer","city_id","channel","date")
    if (df_new.take(1).nonEmpty){
      val tecent_jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
      GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append)
    }else {
      println("no need to insert into user feature")
    }
  }
}