package com.gmei


import java.io.Serializable
import java.time.LocalDate

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import org.apache.spark.sql.functions.lit

import scala.util.Try



object EsmmData {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = GmeiConfig.getMinusNDate(1)
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String]("date")
        .text(s"the date you used")
        .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
      ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")


      val max_stat_date = sc.sql(
        s"""
           |select max(stat_date) from esmm_train_data
         """.stripMargin
      )
      val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
      println("max_stat_date_str",max_stat_date_str)
      println("param.date",param.date)
      if (max_stat_date_str != param.date){
        val stat_date = param.date
        println(stat_date)
//        val imp_data = sc.sql(
//          s"""
//             |select distinct stat_date,device_id,city_id as ucity_id,
//             |  cid_id,diary_service_id
//             |from data_feed_exposure
//             |where cid_type = 'diary'
//             |and stat_date ='${stat_date}'
//         """.stripMargin
//        )

        val imp_data = sc.sql(
          s"""
             |select * from
             |(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id
             |from data_feed_exposure
             |where cid_type = 'diary'
             |and stat_date ='${stat_date}'
             |group by stat_date,device_id,city_id,cid_id,diary_service_id having count(*) > 1) a
         """.stripMargin
        )
        //      imp_data.show()
        //      println("imp_data.count()")
        //      println(imp_data.count())


        val clk_data = sc.sql(
          s"""
             |select distinct stat_date,device_id,city_id as ucity_id,
             |  cid_id,diary_service_id
             |from data_feed_click
             |where cid_type = 'diary'
             |and stat_date ='${stat_date}'
         """.stripMargin
        )
        //      clk_data.show()
        //      println("clk_data.count()")
        //      println(clk_data.count())



        val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
        //      imp_data_filter.createOrReplaceTempView("imp_data_filter")
        //      imp_data_filter.show()
        //      println("imp_data_filter.count()")
        //      println(imp_data_filter.count())


        val stat_date_not = stat_date.replace("-","")
        val cvr_data = sc.sql(
          s"""
             |select distinct
             |  from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
             |  cl_id as device_id,city_id as ucity_id,
             |  params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
             |from online.tl_hdfs_maidian_view
             |where action='page_view'
             |and partition_date ='${stat_date_not}'
             |and params['page_name'] = 'welfare_detail'
             |and params['referrer'] = 'diary_detail'
         """.stripMargin
        )

        val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
        //      cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
        //      cvr_data_filter.show()
        //      println("cvr_data_filter.count()")
        //      println(cvr_data_filter.count())


        val other_click = get_other_click(sc,stat_date_not)
        val all_click = clk_data.union(other_click)
        val clk_data_filter =all_click.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
        //      clk_data_filter.createOrReplaceTempView("clk_data_filter")
        //      clk_data_filter.show()
        //      println("clk_data_filter.count()")
        //      println(clk_data_filter.count())


        val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
        union_data.createOrReplaceTempView("union_data")
        //      union_data.show()
        //      println("union_data.count()")
        //      println(union_data.count())



        val union_data_clabel = sc.sql(
          s"""
             |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
             |  c.level1_id as clevel1_id
             |from union_data a
             |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
             |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
             |where b.partition_date='${stat_date_not}'
             |and c.partition_date='${stat_date_not}'
         """.stripMargin
        )
        union_data_clabel.createOrReplaceTempView("union_data_clabel")
        //      union_data_clabel.show()

        val union_data_slabel = sc.sql(
          s"""
             |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
             |  c.level1_id as slevel1_id
             |from union_data_clabel a
             |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
             |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
             |where b.partition_date='${stat_date_not}'
             |and c.partition_date='${stat_date_not}'
         """.stripMargin
        )
        union_data_slabel.createOrReplaceTempView("union_data_slabel")
        //      union_data_slabel.show()


        val union_data_ccity_name = sc.sql(
          s"""
             |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
             |  c.name as ccity_name
             |from union_data_slabel a
             |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
             |left join src_zhengxing_api_tag c on b.tag_id=c.id
             | where c.tag_type=4
         """.stripMargin
        )
        union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
        //      union_data_ccity_name.show()

        val union_data_scity_id = sc.sql(
          s"""
             |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
             |  d.city_id as scity_id
             |from union_data_ccity_name a
             |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
             |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
             |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
             |where b.partition_date='${stat_date_not}'
             |and c.partition_date='${stat_date_not}'
             |and d.partition_date='${stat_date_not}'
         """.stripMargin
        )
        union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
        union_data_scity_id.show()

        val union_data_scity_id2 = sc.sql(
          s"""
             |select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,first(diary_service_id) diary_service_id,first(y) y,
             |first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id
             |from union_data_scity_id
             |group by device_id,cid_id
         """.stripMargin
        )

        GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append)

      } else {
        println("esmm_train_data already have param.date data")
      }

      sc.stop()

    }
  }
  def get_other_click(spark:SparkSession,yesterday:String): DataFrame ={
    var result01 = spark.sql(
      s"""
         |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
         |device["device_id"] as device_id,channel as device_type,
         |city_id,params['business_id'] as cid
         |from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
         |and action = 'on_click_diary_card' and params['tab_name'] != '精选'
         |and params['page_name'] = 'home'
       """.stripMargin
    )
    //    println(result01.count())
    //    result01.show(6)


    val recommend = spark.sql(
      s"""
         |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
         |device["device_id"] as device_id,channel as device_type,
         |city_id,params["business_id"] as cid
         |from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
         |and action = 'diarybook_detail_click_recommend_block' and params["business_type"] = "diary"
       """.stripMargin
    )
    //    println("详情页推荐日记:")
    //    println(recommend.count())
    //    recommend.show(6)

    val search_zonghe = spark.sql(
      s"""
         |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
         |device["device_id"] as device_id,channel as device_type,city_id,params["business_id"] as cid
         |from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
         |and action = 'search_result_click_infomation_item' and params["business_type"] = "diary"
       """.stripMargin
    )
    //    println("搜索综合:")
    //    println(search_zonghe.count())
    //    search_zonghe.show(6)

    val non_home = spark.sql(
      s"""
         |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
         |device["device_id"] as device_id,channel as device_type,city_id,params["diary_id"] as cid
         |from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
         |and action = 'on_click_diary_card' and params['page_name'] != 'home'
       """.stripMargin
    )
    //    println("non home:")
    //    println(non_home.count())
    //    non_home.show(6)

    result01 = result01.union(recommend).union(search_zonghe).union(non_home)
    //    println(result01.count())

    result01.createOrReplaceTempView("temp_result")

    val result02 = spark.sql(
      s"""
         |select * from temp_result
         |where device_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
         |        ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
         |        ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
         |        ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
         |        ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5')
         |       and device_id not in
         |       (SELECT cl_id
         |        FROM online.ml_hospital_spam_pv_day
         |        WHERE partition_date>='20180402' AND partition_date<'${yesterday}'
         |              AND pv_ratio>=0.95
         |        UNION ALL
         |        SELECT cl_id
         |        FROM online.ml_hospital_spam_pv_month
         |        WHERE partition_date>='20171101' AND partition_date<'${yesterday}'
         |            AND pv_ratio>=0.95
         |        )
       """.stripMargin
    )

    result02.createOrReplaceTempView("temp_result02")

    val result_dairy = spark.sql(
      s"""
         |select
         |    re.stat_date as stat_date,
         |    re.device_id as device_id,
         |    re.city_id as ucity_id,
         |    re.cid as cid_id,
         |    da.service_id as diary_service_id
         |from temp_result02 re
         |left join online.ml_community_diary_updates da
         |on re.cid = da.diary_id
         |where da.partition_date='${yesterday}'
       """.stripMargin
    ).distinct()
    result_dairy
  }

}


object EsmmPredData {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
      ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
      ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix")
      ti.tidbMapTable("eagle","ffm_diary_queue")
      ti.tidbMapTable("eagle","search_queue")
      ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data")
      ti.tidbMapTable("eagle","biz_feed_diary_queue")
      ti.tidbMapTable("jerry_prod","data_feed_exposure_precise")

      import sc.implicits._

      val yesteday_have_seq = GmeiConfig.getMinusNDate(1)

      //nearby_data
      val raw_data = sc.sql(
        s"""
           |select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
           |(select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
           |union
           |select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
           |union
           |select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
           |where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}')
         """.stripMargin
      )
      raw_data.show()


      val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map {
        case (device_city, cid_data) =>
          val device_id = Try(device_city.split(",")(0)).getOrElse("")
          val city_id = Try(device_city.split(",")(1)).getOrElse("")
          val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("")
          (device_id,city_id ,s"$cids")
      }.filter(_._3!="").toDF("device_id","city_id","merge_queue")
      println("nearby_device_count",raw_data1.count())

      val start= LocalDate.now().minusDays(14).toString
      import sc.implicits._
      val sql =
        s"""
           |select distinct device_id,cid_id from data_feed_exposure_precise
           |where stat_date >= "$start" and cid_type = "diary"
       """.stripMargin
      val history = sc.sql(sql).repartition(200).rdd
        .map(x =>(x(0).toString,x(1).toString)).groupByKey().map(x => (x._1,x._2.mkString(",")))
        .toDF("device_id","cid_set")
      history.persist()
      history.createOrReplaceTempView("history")

      if (history.take(1).nonEmpty){
        raw_data1.createOrReplaceTempView("r")
        val sql_nearby_filter =
          s"""
             |select r.device_id,r.city_id,r.merge_queue,history.cid_set from r
             |left join history on r.device_id = history.device_id
       """.stripMargin
        val df = sc.sql(sql_nearby_filter).na.fill("").rdd
          .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
          .map(x => (x._1,x._2,x._3.split(",").diff(x._4.split(",")).mkString(",")))
          .toDF("device_id","city_id","merge_queue")
        df.createOrReplaceTempView("raw_data1")
      }else{
        raw_data1.createOrReplaceTempView("raw_data1")
      }

      val raw_data2 = sc.sql(
        s"""
           |select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data1
         """.stripMargin
      ).withColumn("label",lit(1))
      raw_data2.createOrReplaceTempView("raw_data2")
      println("nearby_explode_count",raw_data2.count())


      // native_data
      val native_data = sc.sql(
        s"""
           |select distinct a.device_id,a.city_id,b.native_queue from data_feed_exposure a
           |left join (select if(city_id='world','worldwide',city_id) city_id,native_queue from biz_feed_diary_queue) b
           |on a.city_id = b.city_id
           |where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
         """.stripMargin
      )
      println("native_device_count",native_data.count())

      if (history.take(1).nonEmpty){
        native_data.createOrReplaceTempView("temp")
        val sql_native_filter =
          s"""
             |select t.device_id,t.city_id,t.native_queue,history.cid_set from temp t
             |left join history on t.device_id = history.device_id
       """.stripMargin
        val df = sc.sql(sql_native_filter).na.fill("").rdd
          .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
          .map(x => (x._1,x._2,x._3.split(",").diff(x._4.split(",")).mkString(",")))
          .toDF("device_id","city_id","native_queue")
        df.createOrReplaceTempView("native_data")
      }else{
        native_data.createOrReplaceTempView("native_data")
      }

      val native_data1 = sc.sql(
        s"""
           |select device_id,city_id as ucity_id,explode(split(native_queue,',')) as cid_id from native_data
          """.stripMargin
      ).withColumn("label",lit(0))
      native_data1.createOrReplaceTempView("native_data1")
      println("native_explode_count",native_data1.count())



      //union
      val union_data = sc.sql(
        s"""
           |select device_id,ucity_id,cid_id,label from native_data1
           |union
           |select device_id,ucity_id,cid_id,label from raw_data2
         """.stripMargin
      )
      union_data.createOrReplaceTempView("raw_data")
      println("union_count",union_data.count())


      //join feat
      val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
      val sid_data = sc.sql(
        s"""
           |select distinct
           |  from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
           |  a.device_id,a.ucity_id,a.cid_id,a.label, b.service_id as diary_service_id
           |from raw_data a
           |left join online.ml_community_diary_updates b on a.cid_id = b.diary_id
           |where b.partition_date = '${yesteday}'
         """.stripMargin
      )
//      sid_data.show()
      println(sid_data.count())

      val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
      sid_data_label.createOrReplaceTempView("union_data")


      val union_data_clabel = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,
           |  c.level1_id as clevel1_id
           |from union_data a
           |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
           |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
         """.stripMargin
      )
      union_data_clabel.createOrReplaceTempView("union_data_clabel")
      //      union_data_clabel.show()

      val union_data_slabel = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,
           |  c.level1_id as slevel1_id
           |from union_data_clabel a
           |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
           |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
         """.stripMargin
      )
      union_data_slabel.createOrReplaceTempView("union_data_slabel")
      //      union_data_slabel.show()


      val union_data_ccity_name = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
           |  c.name as ccity_name
           |from union_data_slabel a
           |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
           |left join src_zhengxing_api_tag c on b.tag_id=c.id
           | where c.tag_type=4
         """.stripMargin
      )
      union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
      //      union_data_ccity_name.show()

      val union_data_scity_id = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
           |  d.city_id as scity_id
           |from union_data_ccity_name a
           |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
           |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
           |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
           |and d.partition_date='${yesteday}'
         """.stripMargin
      )
      union_data_scity_id.createOrReplaceTempView("union_data_scity_id")

      val union_data_scity_id2 = sc.sql(
        s"""
           |select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,label,first(diary_service_id)diary_service_id,first(y) y,
           |first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id
           |from union_data_scity_id
           |group by device_id,cid_id,label
         """.stripMargin
      )


      //      union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
      println(union_data_scity_id2.count())
      GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)




      sc.stop()

    }
  }

}

object GetDiaryPortrait {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = GmeiConfig.getMinusNDate(1)
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String]("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")

      val stat_date = param.date.replace("-","")

      val diary_tag = sc.sql(
        s"""
           |select c.diary_id,
           |		concat_ws(',',collect_set(cast(c.level1_id as string))) as level1_ids,
           |		concat_ws(',',collect_set(cast(c.level2_id as string))) as level2_ids,
           |		concat_ws(',',collect_set(cast(c.level3_id as string))) as level3_ids from
           |	(select a.diary_id,b.level1_id,b.level2_id,b.level3_id
           |		from online.tl_hdfs_diary_tags_view a
           |		left join online.bl_tag_hierarchy_detail b
           |		on a.tag_id = b.id
           |		where a.partition_date = '${stat_date}'
           |		and b.partition_date = '${stat_date}') c
           |	group by c.diary_id
         """.stripMargin
      )
      diary_tag.show()
      println(diary_tag.count())
      val jdbc = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"

      GmeiConfig.writeToJDBCTable(jdbc,diary_tag,"diary_feat",SaveMode.Overwrite)


      sc.stop()

    }
  }

}

object GetDevicePortrait {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = GmeiConfig.getMinusNDate(1)
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String]("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")
      import sc.implicits._

      val stat_date = param.date.replace("-","")

      val device_search_tag = sc.sql(
        s"""
           |select c.device_id,c.stat_date,c.level1_id,count(c.level1_id) as level1_count
           |from (select
           |		a.cl_id as device_id,a.partition_date as stat_date,
           |		COALESCE(a.params['diary_id'], a.params['business_id'], 0) as cid_id,
           |		b.level1_ids as level1_id
           |	from online.tl_hdfs_maidian_view a
           |	left join diary_feat b
           |	on COALESCE(a.params['diary_id'], a.params['business_id'], 0) = b.diary_id
           |	where
           |    b.level1_ids is not null and
           |		a.partition_date = '${stat_date}'
           |		and (a.action = 'on_click_diary_card' or (a.action="full_stack_click_video_card_full_screen_play" and a.params["card_type"]="diary"))) c
           |group by c.device_id,c.level1_id,c.stat_date
         """.stripMargin
      )
      device_search_tag.show()
      println(device_search_tag.count())

      device_search_tag.createOrReplaceTempView("tag_count")

      val max_count_tag = sc.sql(
      s"""
         |select a.device_id,a.stat_date,a.level1_id as max_level1_id,a.level1_count as max_level1_count
         |from tag_count a
         |inner join
         |(select device_id,max(level1_count) as max_count from tag_count group by device_id) b
         |on a.level1_count = b.max_count and a.device_id = b.device_id
       """.stripMargin
    )
//        .rdd.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
//      max_count_tag.foreachPartition(GmeiConfig.updateDeviceFeat)
//
//      max_count_tag.take(10).foreach(println)
//      println(max_count_tag.count())

      //drop duplicates
      val max_count_tag_rdd = max_count_tag.rdd.groupBy(_.getAs[String]("device_id")).map {
        case (device_id,data) =>
          val stat_date = data.map(_.getAs[String]("stat_date")).head
          val max_level1_id = data.map(_.getAs[String]("max_level1_id")).head.toString
          val max_level1_count = data.map(_.getAs[Long]("max_level1_count")).head.toString
          (device_id,stat_date,max_level1_id,max_level1_count)
      }.filter(_._1!=null)


      max_count_tag_rdd.foreachPartition(GmeiConfig.updateDeviceFeat)
      max_count_tag_rdd.take(10).foreach(println)
      println(max_count_tag_rdd.count())

      sc.stop()

    }
  }

}

object GetLevelCount {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    path: String = null
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String]("path")
      .text(s"the path you used")
      .action((x,c) => c.copy(path = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1).replace("-","")

//      val diary_queue = sc.read.json(param.path).rdd.map(x => x(0).toString).distinct().collect().toList.mkString(",")
      val diary_queue = "16215222,16204965,15361235,16121397,16277565,15491159,16299587,16296887,15294642,16204934,15649199,16122580,16122580,16122580,16122580,16122580,16122580"
      val diary_level1 = sc.sql(
        s"""
           |select diary_id,explode(split(level1_ids,';')) level1_id from diary_feat
           |where diary_id in (${diary_queue})
         """.stripMargin
      )
      diary_level1.show()
      println(diary_level1.count())

      //胸部日记id
      val cid_xiong = diary_level1.rdd.filter(_.getAs("level1_id")=="7")
      cid_xiong.collect().foreach(println)

      //计算各类别日记的数量
      val level1_count = diary_level1.rdd.map(x => (x(1).toString)).map(level1 => (level1,1)).reduceByKey((a,b) => a+b).sortBy(_._2,false).toDF("level1_id","count")
      level1_count.show()
      level1_count.createOrReplaceTempView("tmp")
      val level1_name = sc.sql(
        s"""
           |select a.level1_id,a.count,b.level1_name from tmp a
           |left join (select distinct level1_id,level1_name from online.bl_tag_hierarchy_detail where partition_date = '${stat_date}') b
           |on a.level1_id = b.level1_id order by a.count desc
         """.stripMargin
      )
      level1_name.show()

      sc.stop()

    }
  }

}

object GetDeviceDuration {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = GmeiConfig.getMinusNDate(1)
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String]("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")
      import sc.implicits._

      val stat_date = param.date

      val uid_duration = sc.sql(
        s"""
           |select a.device_id,coalesce(a.start_time,a.ndiary_in,0) in_time,coalesce(a.end_time,a.ndiary_out,0) out_time,
           |explode(split(b.level1_ids,';')) level1_id
           |from data_feed_click a
           |left join diary_feat b on a.cid_id = b.diary_id
           |where a.stat_date > '2018-12-12'
         """.stripMargin
      )
      uid_duration.show()

      val uid_duration_last = sc.sql(
        s"""
           |select d.device_id,
           |sum(if(d.level1_id=0,sum_duration,0)) as kongbai,
           |sum(if(d.level1_id=1,sum_duration,0)) as yanbu,
           |sum(if(d.level1_id=10,sum_duration,0)) as simi,
           |sum(if(d.level1_id=1024,sum_duration,0)) as zitizhifang,
           |sum(if(d.level1_id=1080,sum_duration,0)) as banyongjiu,
           |sum(if(d.level1_id=11,sum_duration,0)) as yachi,
           |sum(if(d.level1_id=12,sum_duration,0)) as kouchun,
           |sum(if(d.level1_id=13,sum_duration,0)) as erbu,
           |sum(if(d.level1_id=2,sum_duration,0)) as bibu,
           |sum(if(d.level1_id=2053,sum_duration,0)) as remen,
           |sum(if(d.level1_id=2054,sum_duration,0)) as banyongjiuzhuang,
           |sum(if(d.level1_id=2212,sum_duration,0)) as jiankangguanli,
           |sum(if(d.level1_id=2214,sum_duration,0)) as qita,
           |sum(if(d.level1_id=3,sum_duration,0)) as lunkuo,
           |sum(if(d.level1_id=4,sum_duration,0)) as shoushen,
           |sum(if(d.level1_id=5,sum_duration,0)) as pifu,
           |sum(if(d.level1_id=6933,sum_duration,0)) as shenghuo,
           |sum(if(d.level1_id=7,sum_duration,0)) as xiongbu,
           |sum(if(d.level1_id=9,sum_duration,0)) as maofa,
           |sum(if(d.level1_id=922,sum_duration,0)) as kangshuai,
           |sum(if(d.level1_id=929,sum_duration,0)) as shili,
           |sum(if(d.level1_id=971,sum_duration,0)) as chanhouxiufu,
           |sum(if(d.level1_id=992,sum_duration,0)) as zhushe
           |from
           |	(select c.device_id,c.level1_id,sum(c.duration) as sum_duration from
           |		(select a.device_id,
           |		coalesce(a.end_time,a.ndiary_out,0)-coalesce(a.start_time,a.ndiary_in,0) as duration,
           |		explode(split(b.level1_ids,';')) level1_id
           |		from data_feed_click a
           |		left join diary_feat b on a.cid_id = b.diary_id where a.stat_date > '2018-12-12') c
           |	group by c.device_id,c.level1_id) d
           |group by d.device_id
         """.stripMargin
      )
      uid_duration_last.show()


      sc.stop()

    }
  }

}


object EsmmDataTest {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = GmeiConfig.getMinusNDate(1)
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String]("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2
      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
      ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
      ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_click")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure_precise")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "train_data")

      click(sc)

      val max_stat_date = sc.sql(
        s"""
           |select max(stat_date) from train_data
         """.stripMargin
      )
      val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
      println("max_stat_date_str",max_stat_date_str)
      println("param.date",param.date)
      if (max_stat_date_str != param.date || max_stat_date_str == null){
        val stat_date = param.date
        println(stat_date)
        //        val imp_data = sc.sql(
        //          s"""
        //             |select distinct stat_date,device_id,city_id as ucity_id,
        //             |  cid_id,diary_service_id
        //             |from data_feed_exposure
        //             |where cid_type = 'diary'
        //             |and stat_date ='${stat_date}'
        //         """.stripMargin
        //        )

        val imp_data = sc.sql(
          s"""
             |select * from
             |(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id
             |from data_feed_exposure_precise
             |where cid_type = 'diary'
             |and stat_date ='${stat_date}'
             |group by stat_date,device_id,city_id,cid_id,diary_service_id) a
         """.stripMargin
        )
        //      imp_data.show()
        //      println("imp_data.count()")
        //      println(imp_data.count())


        val clk_data = sc.sql(
          s"""
             |select distinct stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id
             |from esmm_click
             |where stat_date ='${stat_date}'
         """.stripMargin
        )
        //      clk_data.show()
        //      println("clk_data.count()")
        //      println(clk_data.count())



        val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
        //      imp_data_filter.createOrReplaceTempView("imp_data_filter")
        //      imp_data_filter.show()
        //      println("imp_data_filter.count()")
        //      println(imp_data_filter.count())


        val stat_date_not = stat_date.replace("-","")
        val cvr_data = sc.sql(
          s"""
             |select distinct
             |  from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
             |  cl_id as device_id,city_id as ucity_id,
             |  params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
             |from online.tl_hdfs_maidian_view
             |where action='page_view'
             |and partition_date ='${stat_date_not}'
             |and params['page_name'] = 'welfare_detail'
             |and params['referrer'] = 'diary_detail'
         """.stripMargin
        )

        val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
        //      cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
        //      cvr_data_filter.show()
        //      println("cvr_data_filter.count()")
        //      println(cvr_data_filter.count())



        val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
        //      clk_data_filter.createOrReplaceTempView("clk_data_filter")
        //      clk_data_filter.show()
        //      println("clk_data_filter.count()")
        //      println(clk_data_filter.count())


        val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
        union_data.createOrReplaceTempView("union_data")
        //      union_data.show()
        //      println("union_data.count()")
        //      println(union_data.count())



        val union_data_clabel = sc.sql(
          s"""
             |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
             |  c.level1_id as clevel1_id
             |from union_data a
             |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
             |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
             |where b.partition_date='${stat_date_not}'
             |and c.partition_date='${stat_date_not}'
         """.stripMargin
        )
        union_data_clabel.createOrReplaceTempView("union_data_clabel")
        //      union_data_clabel.show()

        val union_data_slabel = sc.sql(
          s"""
             |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
             |  c.level1_id as slevel1_id
             |from union_data_clabel a
             |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
             |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
             |where b.partition_date='${stat_date_not}'
             |and c.partition_date='${stat_date_not}'
         """.stripMargin
        )
        union_data_slabel.createOrReplaceTempView("union_data_slabel")
        //      union_data_slabel.show()


        val union_data_ccity_name = sc.sql(
          s"""
             |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
             |  c.name as ccity_name
             |from union_data_slabel a
             |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
             |left join src_zhengxing_api_tag c on b.tag_id=c.id
             | where c.tag_type=4
         """.stripMargin
        )
        union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
        //      union_data_ccity_name.show()

        val union_data_scity_id = sc.sql(
          s"""
             |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
             |  d.city_id as scity_id
             |from union_data_ccity_name a
             |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
             |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
             |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
             |where b.partition_date='${stat_date_not}'
             |and c.partition_date='${stat_date_not}'
             |and d.partition_date='${stat_date_not}'
         """.stripMargin
        )
        union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
        union_data_scity_id.show()

        val union_data_scity_id2 = sc.sql(
          s"""
             |select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,first(diary_service_id) diary_service_id,first(y) y,
             |first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id
             |from union_data_scity_id
             |group by device_id,cid_id
         """.stripMargin
        )


        GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="train_data",SaveMode.Append)

      } else {
        println("train_data already have param.date data")
      }

      sc.stop()

    }
  }
  def click(spark:SparkSession): Unit ={
    val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
    println(yesterday)
    val stat_yesterday = LocalDate.now().minusDays(1).toString
    val max_stat_date = spark.sql(
      s"""
         |select max(stat_date) from esmm_click
         """.stripMargin
    )
    val max = max_stat_date.collect().map(s => s(0).toString).head
    println("max_stat_date",max)
    if (max != stat_yesterday || max == null){
      val result01 = spark.sql(
        s"""
           |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
           |device["device_id"] as device_id,channel as device_type,
           |city_id,params['diary_id'] as cid
           |from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
           |and action = 'on_click_diary_card' and params['tab_name'] = '精选'
           |and params['page_name'] = 'home'
       """.stripMargin
      )

      result01.createOrReplaceTempView("temp_result")

      val result02 = spark.sql(
        s"""
           |select * from temp_result
           |where device_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |        ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |        ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |        ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |        ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5')
           |       and device_id not in
           |       (SELECT cl_id
           |        FROM online.ml_hospital_spam_pv_day
           |        WHERE partition_date>='20180402' AND partition_date<'${yesterday}'
           |              AND pv_ratio>=0.95
           |        UNION ALL
           |        SELECT cl_id
           |        FROM online.ml_hospital_spam_pv_month
           |        WHERE partition_date>='20171101' AND partition_date<'${yesterday}'
           |            AND pv_ratio>=0.95
           |        )
       """.stripMargin
      )

      result02.createOrReplaceTempView("temp_result02")

      val result_dairy = spark.sql(
        s"""
           |select
           |    re.stat_date as stat_date,
           |    re.device_id as device_id,
           |    re.device_type as device_type,
           |    re.cid as cid_id,
           |    re.city_id as city_id,
           |    da.service_id as diary_service_id
           |from temp_result02 re
           |left join online.ml_community_diary_updates da
           |on re.cid = da.diary_id
           |where da.partition_date='${yesterday}'
       """.stripMargin
      )

      val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
      GmeiConfig.writeToJDBCTable(jdbcuri,result_dairy, table="esmm_click",SaveMode.Append)
      println("data insert")
    }else{
      println("data already exists")
    }

  }

}