package com.gmei

import java.io.Serializable
import java.text.SimpleDateFormat

import breeze.linalg.split
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{Row, SaveMode, SparkSession, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import com.github.nscala_time.time.Imports._
import java.text.SimpleDateFormat
import java.util.Date

import scala.util.parsing.json.JSON

object temp_count {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      val stat_date = GmeiConfig.getMinusNDate(1)
//      val stat_date = param.date
      //println(param.date)
      val partition_date = stat_date.replace("-","")
      val decive_id_oldUser = sc.sql(
        s"""
           |select distinct(device_id) as device_id
           |from online.ml_device_day_active_status
           |where active_type = '4'
           |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |    ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |    ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |    ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |    ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |    ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |    ,'promotion_shike','promotion_julang_jl03','','unknown')
           |and partition_date ='${partition_date}'
         """.stripMargin
      )
      decive_id_oldUser.createOrReplaceTempView("device_id_old")


      val clk_count_oldUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(jd.cid_id) as clk_count_oldUser
           |from data_feed_click jd inner join device_id_old
           |on jd.device_id = device_id_old.device_id
           |where  (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
           |and jd.device_id not in (select device_id from blacklist)
           |and jd.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
           |and jd.stat_date ='${stat_date}'
         """.stripMargin
      )

      val imp_count_oldUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser
           |from data_feed_exposure je inner join device_id_old
           |on je.device_id = device_id_old.device_id
           |where je.cid_type = 'diary'
           |and je.device_id not in (select device_id from blacklist)
           |and je.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
           |and je.stat_date ='${stat_date}'
         """.stripMargin
      )

      val clk_count_all = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(cid_id) as clk_count_all
           |from data_feed_click
           |where  (cid_type = 'diary' or cid_type = 'diary_video')
           |and device_id not in (select device_id from blacklist)
           |and city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
           |and stat_date ='${stat_date}'
         """.stripMargin
      )

      val imp_count__all = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(cid_id) as imp_count_all
           |from data_feed_exposure
           |where cid_type = 'diary'
           |and device_id not in (select device_id from blacklist)
           |and city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
           |and stat_date ='${stat_date}'
         """.stripMargin
      )

      val clk_count_oldUser_Contrast = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(cid_id) as clk_count_oldUser_Contrast
           |from data_feed_click jd inner join device_id_old
           |on jd.device_id = device_id_old.device_id
           |where  (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
           |and jd.device_id regexp'1$$'
           |and jd.device_id not in (select device_id from blacklist)
           |and jd.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
           |and jd.stat_date ='${stat_date}'
         """.stripMargin
      )

      val imp_count_oldUser_Contrast = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast
           |from data_feed_exposure je inner join device_id_old
           |on je.device_id = device_id_old.device_id
           |where je.cid_type = 'diary'
           |and je.device_id regexp'1$$'
           |and je.device_id not in (select device_id from blacklist)
           |and je.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
           |and je.stat_date ='${stat_date}'
         """.stripMargin
      )




      val result1 = clk_count_oldUser.join(imp_count_oldUser,"stat_date")
        .join(clk_count_all,"stat_date")
        .join(imp_count__all,"stat_date")
        .join(clk_count_oldUser_Contrast,"stat_date")
        .join(imp_count_oldUser_Contrast,"stat_date")
      result1.show()

      GmeiConfig.writeToJDBCTable(result1, "ffm_diary_ctr", SaveMode.Append)


    }


  }

}




object Repeated_content_recommendation {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


//      val stat_date = GmeiConfig.getMinusNDate(1)
      val stat_date = param.date
      val partition_date = stat_date.replace("-","")



      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |select distinct(device_id)
           |from blacklist
         """.stripMargin
      )
      agency_id.createOrReplaceTempView("agency_id")



      val device_id_oldUser = sc.sql(
        s"""
           |select distinct(om.device_id) as device_id
           |from online.ml_device_day_active_status om left join agency_id
           |on om.device_id = agency_id.device_id
           |where om.active_type = '4'
           |and om.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |    ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |    ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |    ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |    ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |    ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |    ,'promotion_shike','promotion_julang_jl03','','unknown')
           |and om.partition_date ='${partition_date}'
           |and agency_id.device_id is null
         """.stripMargin
      )
      device_id_oldUser.createOrReplaceTempView("device_id_old")
      device_id_oldUser.show()


      val device_id_newUser = sc.sql(
        s"""
           |select distinct(om.device_id) as device_id
           |from online.ml_device_day_active_status om left join agency_id
           |on om.device_id = agency_id.device_id
           |where om.active_type != '4'
           |and om.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |    ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |    ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |    ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |    ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |    ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |    ,'promotion_shike','promotion_julang_jl03','','unknown')
           |and om.partition_date ='${partition_date}'
           |and agency_id.device_id is null
         """.stripMargin
      )
      device_id_newUser.createOrReplaceTempView("device_id_new")
      device_id_newUser.show()



      val exp_diary_new = sc.sql(
        s"""
           |select concat_ws('|',de.device_id,de.cid_id)
           |from data_feed_exposure  de inner join device_id_new
           |on de.device_id=device_id_new.device_id
           |where de.cid_type = 'diary'
           |and de.stat_date ='${stat_date}'
         """.stripMargin
      )
      val get_result_new =exp_diary_new.rdd.map((_, 1)).reduceByKey(_ + _)
              .sortBy(_._2,false)

      val more_than2_new=get_result_new.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y)
      println(more_than2_new)
      val all_new =get_result_new.map(_._2).reduce((x,y)=>x+y)
      println(all_new)
      val repeated_rate_new= more_than2_new / all_new.toDouble
      println(repeated_rate_new)


      val exp_diary_old = sc.sql(
        s"""
           |select concat_ws('|',de.device_id,de.cid_id)
           |from data_feed_exposure de inner join device_id_old
           |on de.device_id=device_id_old.device_id
           |where de.cid_type = 'diary'
           |and de.stat_date ='${stat_date}'
               """.stripMargin
      )
      val get_result_old =exp_diary_old.rdd.map((_, 1)).reduceByKey(_ + _)
        .sortBy(_._2,false)

      val more_than2_old=get_result_old.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y)
      println(more_than2_old)
      val all_old =get_result_old.map(_._2).reduce((x,y)=>x+y)
      println(all_old)
      val repeated_rate_old= more_than2_old / all_old.toDouble
      println(repeated_rate_old)


      val result2=List((stat_date,more_than2_old,all_old,more_than2_new,all_new))
      val df2 = sc.createDataFrame(result2).toDF("stat_date","old_rep_count","old_imp_all","new_rep_count","new_imp_all")

      GmeiConfig.writeToJDBCTable(df2, table = "Repeated_evaluation_indicator", SaveMode.Append)

//      val exp_diary_old = sc.sql(
//        s"""
//           |select concat_ws('|',de.device_id,de.cid_id)
//           |from data_feed_exposure de inner join device_id_old
//           |where de.cid_type = 'diary'
//           |and de.stat_date ='${stat_date}'
//         """.stripMargin
//      )
//      val get_result_old =exp_diary_old.rdd.map((_, 1)).reduceByKey(_ + _)
//        .sortBy(_._2,false)
//
//      val more_than2_old=get_result_old.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y)
//      println(more_than2_old)
//      val all_old =get_result_old.map(_._2).reduce((x,y)=>x+y)
//      println(all_old)
//      val repeated_rate_old= more_than2_old / all_old.toDouble
//      println(repeated_rate_old)
//
//
//      val result2=List((stat_date,more_than2_old,all_old))
//      val df2 = sc.createDataFrame(result2).toDF("stat_date","old_rep_count","old_imp_all")
//
//      GmeiConfig.writeToJDBCTable(df2, table = "Repeated_evaluation_indicator_old", SaveMode.Append)


//      val temp=get_result.collect()
//            for (i <- 0 until 30 ) {
//              println(temp(i))
//            }
    }


  }

}


object Repeated_content_recommendation_moreday {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


//      val stat_date = GmeiConfig.getMinusNDate(1)
//      val stat_date = "2019-01-16"
//      val partition_date = stat_date.replace("-","")


      val now= new Date()
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
      val date = dateFormat.format(now.getTime - 86400000L * 8)
      val yesterday=dateFormat.format(now.getTime- 86400000L)


      val exp_diary = sc.sql(
        s"""
           |select stat_date,device_id,concat_ws(',',collect_set(distinct cid_id)) as expoure_diary
           |from data_feed_exposure_precise
           |where cid_type = 'diary'
           |and stat_date >='${date}'
           |and device_id not in (select device_id from blacklist)
           |group by device_id,stat_date
         """.stripMargin
      ).rdd.map(row=>(row(0).toString,row(1).toString,row(2).toString)).map(row=>(row._2,row._3)).groupByKey()
        .filter(x => x._2.size >1)

      //打印结果
//      val temp=exp_diary.take(10).foreach(println)

//      val count_imp=exp_diary.map(_._2).map(row=>row.flatMap(x=>x.split(",")).toArray)
//        .map(x => (x,x)).map(x => (x._1.distinct.size,x._2.size)).map(x => (x._2-x._1,x._2))

      //统计每个用户重复日记个数
      val count_imp=exp_diary.map(_._2).map(row=>row.flatMap(x=>x.split(",")).toArray)
        .map(x => (x,x)).map(x => (x._1.distinct.size,x._2.size)).map(x => (x._2-x._1,x._2)).collect()
      val fenmu = count_imp.map(x => x._1).reduce((x,y) => x+y)
      val fenzi =   count_imp.map(x => x._2).reduce((x,y) => x+y)

      val repeated_rate= fenmu / fenzi.toDouble
      val result=List((yesterday,repeated_rate))
      println(result)
      val df_result = sc.createDataFrame(result)

      GmeiConfig.writeToJDBCTable(df_result, table = "Repeated_content_recommendation_moreday", SaveMode.Append)

//      exp_diary.show()
//      exp_diary.createOrReplaceTempView("exp_diary")
//      GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator_moreday", SaveMode.Append)

    }
  }
}





object GetHiveSearchData {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  case class GetHiveSearchData_temp(stat_date:String,diaryExposureVal:String,diaryClickNum:String,meigouExposureVal:String,meigouClickNum:String,searchResultExposureVal:String,searchResultClickNum:String,searchDoctorExposureVal:String,searchDoctorClickNum:String,searchHospitalExposureVal:String,searchHospitalClickNum:String)

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "GetHiveSearchData_CTR")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      val stat_date = GmeiConfig.getMinusNDate(1)
//      val stat_date = param.date
      val partition_date = stat_date.replace("-","")


      val strDiaryExposureAction = "/api/search/v2/diary"
      val strDiaryClickAction = "search_result_more_diary_click_item"    //需要确认
      var (diaryExposureVal,diaryClickNum,diaryExposureMapCount,diaryExposureFilterCount) = GetSearchResultData(sc,strDiaryExposureAction,strDiaryClickAction,stat_date)


      val strMeigouExposureAction = "/api/search/v2/service"
      val strMeigouClickAction = "search_result_welfare_click_item"
      var (meigouExposureVal,meigouClickNum,meigouExposureMapCount,meigouExposureFilterCount) = GetSearchResultData(sc,strMeigouExposureAction,strMeigouClickAction,stat_date)


      val strSearchResultExposureAction = "/api/search/v2/content"
      val strSearchResultClickAction = "search_result_click_diary_item"  //需要确认
      var (searchResultExposureVal,searchResultClickNum,searchResultExposureMapCount,searchResultExposureFilterCount) = GetSearchResultData(sc,strSearchResultExposureAction,strSearchResultClickAction,stat_date)


      val strSearchDoctorExposureAction = "/api/search/v2/doctor"
      val strSearchDoctorClickAction = "search_result_doctor_click_item"
      var (searchDoctorExposureVal,searchDoctorClickNum,searchDoctorExposureMapCount,searchDoctorExposureFilterCount) = GetSearchResultData(sc,strSearchDoctorExposureAction,strSearchDoctorClickAction,stat_date)


      val strSearchHospitalExposureAction = "/api/search/v2/hospital"
      val strSearchHospitalClickAction = "search_result_hospital_click_item"
      var (searchHospitalExposureVal,searchHospitalClickNum,searchHospitalExposureMapCount,searchHospitalExposureFilterCount) = GetSearchResultData(sc,strSearchHospitalExposureAction,strSearchHospitalClickAction,stat_date)



      val jigou_id = sc.sql(
        s"""
           |SELECT cl_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date>='20180402' AND partition_date<'${partition_date}'
           |AND pv_ratio>=0.95
           |UNION ALL
           |SELECT cl_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date>='20171101' AND partition_date<'${partition_date}'
           |AND pv_ratio>=0.95
           |UNION ALL
           |select device_id as cl_id from blacklist
       """.stripMargin
      )
      jigou_id.createOrReplaceTempView("jigou_id")


      val diary_clickSql = sc.sql(
        s"""
           |select
           |count(1) click_num
           |from  online.tl_hdfs_maidian_view ov left join jigou_id
           |on ov.cl_id = jigou_id.cl_id
           |where ov.partition_date='${partition_date}'
           |and ov.action='on_click_diary_card'
           |and ov.params['page_name']='search_result_diary'
           |and jigou_id.cl_id is null
       """.stripMargin
      )

      val diary_clickArray = diary_clickSql.collect()
      val diary_click_num = diary_clickArray(0).getAs[Long]("click_num")

      val content_diary_clickSql = sc.sql(
        s"""
           |select
           |count(1) click_num
           |from  online.tl_hdfs_maidian_view ov left join jigou_id
           |on ov.cl_id = jigou_id.cl_id
           |where ov.partition_date='${partition_date}'
           |and ov.action='on_click_diary_card'
           |and  ov.params['page_name']='search_result_more'
           |and jigou_id.cl_id is null
       """.stripMargin
      )

      val content_diary_clickArray:Array[Row] = content_diary_clickSql.collect()
      val content_diary_click_num:Long = content_diary_clickArray(0).getAs[Long]("click_num")




      println("searchDiaryExposureVal:" + diaryExposureVal + "\tsearchDiaryClickNum:" + diary_click_num + "\tclickRate:" + (diary_click_num.floatValue()/diaryExposureVal.floatValue()).formatted("%.2f"))
      println("searchMeigouExposureVal:" + meigouExposureVal + "\tsearchMeigouClickNum:" + meigouClickNum + "\tclickRate:" + (meigouClickNum.floatValue()/meigouExposureVal.floatValue()).formatted("%.2f"))
      println("searchResultExposureVal:" + searchResultExposureVal + "\tsearchResultClickNum:" + (searchResultClickNum+content_diary_click_num) + "\tclickRate:" + ((searchResultClickNum+content_diary_click_num).floatValue()/searchResultExposureVal.floatValue()).formatted("%.2f"))
      println("searchDoctorExposureVal:" + searchDoctorExposureVal + "\tsearchDoctorClickNum:" + searchDoctorClickNum + "\tclickRate:" + (searchDoctorClickNum.floatValue()/searchDoctorExposureVal.floatValue()).formatted("%.2f"))
      println("searchHospitalExposureVal:" + searchHospitalExposureVal + "\tsearchHospitalClickNum:" + searchHospitalClickNum + "\tclickRate:" + (searchHospitalClickNum.floatValue()/searchHospitalExposureVal.floatValue()).formatted("%.2f"))



//      val add_data = sc.sql(
//        s"""
//           |insert into table GetHiveSearchData_CTR(stat_date,diaryExposureVal,diaryClickNum,meigouExposureVal,meigouClickNum,searchResultExposureVal,searchResultClickNum,searchDoctorExposureVal,searchDoctorClickNum,searchHospitalExposureVal,searchHospitalClickNum)
//           |values ('${stat_date}','${diaryExposureVal}','${(diary_click_num+diaryClickNum)}','${meigouExposureVal}','${meigouClickNum}','${searchResultExposureVal}','${(searchResultClickNum+content_diary_click_num)}','${searchDoctorExposureVal}','${searchDoctorClickNum}','${searchHospitalExposureVal}','${searchHospitalClickNum}')
//       """.stripMargin
//      )


      import sc.implicits._
      val result=List((stat_date,diaryExposureVal,(diary_click_num+diaryClickNum),meigouExposureVal,meigouClickNum,searchResultExposureVal,(searchResultClickNum+content_diary_click_num),searchDoctorExposureVal,searchDoctorClickNum,searchHospitalExposureVal,searchHospitalClickNum))
//      val df_result = sc.createDataFrame(result).map(x=>GetHiveSearchData_temp(x(0).toString,x(1).toString,x(2).toString,x(3).toString,x(4).toString,x(5).toString,x(6).toString,x(7).toString,x(8).toString,x(9).toString,x(10).toString)).toDF()
      val df_result = sc.sparkContext.parallelize(result).map(x=>(x._1,x._2,x._3,x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11)).toDF("stat_date","diaryExposureVal","diaryClickNum","meigouExposureVal","meigouClickNum","searchResultExposureVal","searchResultClickNum","searchDoctorExposureVal",
      "searchDoctorClickNum","searchHospitalExposureVal","searchHospitalClickNum")

//
      GmeiConfig.writeToJDBCTable(df_result, table = "GetHiveSearchData_CTR", SaveMode.Append)

    }


    def GetSearchResultData(spark: SparkSession, strExposureAction:String, strClickAction:String,stat_date:String) = {

      val partition_date = stat_date.replace("-","")

      val exposureAccum = spark.sparkContext.longAccumulator("search exposure data")

      val jigou_id = spark.sql(
        s"""
           |SELECT cl_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date>='20180402' AND partition_date<'${partition_date}'
           |AND pv_ratio>=0.95
           |UNION ALL
           |SELECT cl_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date>='20171101' AND partition_date<'${partition_date}'
           |AND pv_ratio>=0.95
           |UNION ALL
           |select device_id as cl_id from blacklist
       """.stripMargin
      )
      jigou_id.createOrReplaceTempView("jigou_id")

      val exposureSql = spark.sql(
        s"""
           |select action,user_id,city_id,app
           |from online.tl_hdfs_backend_view ov left join jigou_id
           |on ov.cl_id = jigou_id.cl_id
           |where  ov.action='$strExposureAction'
           |and ov.partition_date='${partition_date}'
           |and jigou_id.cl_id is null
    """.stripMargin
      )

      val exposureMapResult = exposureSql.rdd.map(row => {


        //val jsonObj = JSON.parseFull(row.getAs[Map[String,Any]]("app").toString())

        val rowAppFieldMap:Map[String,Any] = row.getAs[Map[String,Any]]("app")

        if (rowAppFieldMap.nonEmpty)
        {
          // jsonMap:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String,Any]]

          if (rowAppFieldMap.contains("exposure_data")){

            val exposure_data_lists:List[Any] = JSON.parseFull(rowAppFieldMap("exposure_data").toString).get.asInstanceOf[List[Any]]

            if (exposure_data_lists.length > 0){

              exposure_data_lists.foreach(exposure_item=>{


                if (exposure_item!=None && exposure_item.toString.nonEmpty){
                  val exposureItemMap:Map[String,Any] = exposure_item.asInstanceOf[Map[String,Any]]

                  if (exposureItemMap.contains("list_ids")){
                    //val exposure_list_ids:List[Any] = exposureItemMap.get("list_ids").get.asInstanceOf[List[Any]]
                    val exposure_list_ids:List[Any] = exposureItemMap("list_ids").asInstanceOf[List[Any]]

                    exposureAccum.add(exposure_list_ids.length)
                  }
                }else{
                  None
                }
              })

              exposure_data_lists
            }else{
              None
            }

          }
        }else{
          None
        }

      })

      //must add cache
      exposureMapResult.cache()
      val exposureFilterResult = exposureMapResult.filter(_.!=(None))
      //val exposureArray:Array[Any] = exposureFilterResult.collect()
      //exposureArray.foreach(item => println(item.toString))

      val exposureMapCount:Long = exposureMapResult.count()
      val exposureFilterCount:Long = exposureFilterResult.count()

      val clickSql = spark.sql(
        s"""
           |select
           |count(1) click_num
           |from  online.tl_hdfs_maidian_view  ov left join jigou_id
           |on ov.cl_id = jigou_id.cl_id
           |where ov.partition_date='${partition_date}'
           |and ov.action='$strClickAction'
           |and jigou_id.cl_id is null
     """.stripMargin
      )

      val clickArray:Array[Row] = clickSql.collect()
      val click_num:Long = clickArray(0).getAs[Long]("click_num")


      (exposureAccum.value,click_num,exposureMapCount,exposureFilterCount)



    }

//      GmeiConfig.writeToJDBCTable(df_result, table = "Repeated_content_recommendation_moreday", SaveMode.Append)

      //      exp_diary.show()
      //      exp_diary.createOrReplaceTempView("exp_diary")
      //      GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator_moreday", SaveMode.Append)

    }


  }




object find_reason {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise")

//      val stat_date = GmeiConfig.getMinusNDate(1)
      val stat_date=param.date
      val partition_date = stat_date.replace("-","")
      //机构id
      val blacklist = sc.sql(
        s"""
           |select device_id from blacklist
         """.stripMargin
      )
      blacklist.createOrReplaceTempView("blacklist")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.show()
      agency_id.createOrReplaceTempView("agency_id")
      //每日新用户
      val device_id_newUser = sc.sql(
        s"""
           |select distinct(os.device_id) as device_id
           |from online.ml_device_day_active_status os left join blacklist
           |on os.device_id = blacklist.device_id
           |where os.active_type != '4'
           |and os.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |    ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |    ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |    ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |    ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |    ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |    ,'promotion_shike','promotion_julang_jl03','','unknown')
           |and os.partition_date ='${partition_date}'
           |and blacklist.device_id is null
         """.stripMargin
      )
      device_id_newUser.show()
      device_id_newUser.createOrReplaceTempView("device_id_new")

      //每日老用户
      val device_id_oldUser = sc.sql(
        s"""
           |select distinct(os.device_id) as device_id
           |from online.ml_device_day_active_status os left join blacklist
           |on os.device_id=blacklist.device_id
           |where os.active_type = '4'
           |and os.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |    ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |    ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |    ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |    ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |    ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |    ,'promotion_shike','promotion_julang_jl03','','unknown')
           |and os.partition_date ='${partition_date}'
           |and blacklist.device_id is null
         """.stripMargin
      )
      device_id_oldUser.show()
      device_id_oldUser.createOrReplaceTempView("device_id_old")



      val all_clk = sc.sql(
        s"""
           |select ov.cl_id as device_id
           |from online.tl_hdfs_maidian_view ov left join agency_id
           |on ov.cl_id = agency_id.device_id
           |where ov.action = 'on_click_diary_card'
           |and ov.cl_id != "NULL"
           |and ov.partition_date='${partition_date}'
           |and agency_id.device_id is  null
       """.stripMargin
      )
      all_clk.show()
      all_clk.createOrReplaceTempView("all_clk_diary_card")

      //1.当天老用户中的点击用户数
      val old_clk_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(distinct(oc.device_id)) as old_clk_count
           |from all_clk_diary_card oc inner join device_id_old
           |on oc.device_id = device_id_old.device_id
       """.stripMargin
      )
      old_clk_count.show()
      //1.1有点击的老用户
      val old_clk_device = sc.sql(
        s"""
           |select distinct(oc.device_id) as device_id
           |from all_clk_diary_card oc inner join device_id_old
           |on oc.device_id = device_id_old.device_id
       """.stripMargin
      )
      old_clk_device.createOrReplaceTempView("old_clk_device")



      //2.当天新用户中的点击用户数
      val new_clk_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(distinct(oc.device_id)) as new_clk_count
           |from all_clk_diary_card oc inner join device_id_new
           |on oc.device_id = device_id_new.device_id
       """.stripMargin
      )
//2.1 有点击的新用户
      val new_clk_device = sc.sql(
        s"""
           |select distinct(oc.device_id) as device_id
           |from all_clk_diary_card oc inner join device_id_new
           |on oc.device_id = device_id_new.device_id
       """.stripMargin
      )
      new_clk_device.createOrReplaceTempView("new_clk_device")


      //3.当天老用户数

      val old_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(distinct(dio.device_id)) as old_count
           |from device_id_old dio left join agency_id
           |on dio.device_id = agency_id.device_id
           |where agency_id.device_id is null
       """.stripMargin
      )

      //4.当天新用户数
      val new_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(distinct(din.device_id)) as new_count
           |from device_id_new din left join agency_id
           |on din.device_id = agency_id.device_id
           |where agency_id.device_id is null
       """.stripMargin
      )

      //5.有点击老用户的曝光数
      val exp_clkold_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(dp.device_id) as imp_clkold_count
           |from data_feed_exposure_precise dp inner join old_clk_device
           |on dp.device_id = old_clk_device.device_id
           |where stat_date='${stat_date}'
           |group by stat_date
       """.stripMargin
      )

      //6.有点击新用户的曝光数
      val exp_clknew_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(dp.device_id) as imp_clknew_count
           |from data_feed_exposure_precise dp inner join new_clk_device
           |on dp.device_id = new_clk_device.device_id
           |where stat_date='${stat_date}'
           |group by stat_date
       """.stripMargin
      )

      val result = old_clk_count.join(new_clk_count,"stat_date")
        .join(old_count,"stat_date")
        .join(new_count,"stat_date")
        .join(exp_clkold_count,"stat_date")
        .join(exp_clknew_count,"stat_date")

      GmeiConfig.writeToJDBCTable(result, "device_clk_imp_reason", SaveMode.Append)


    }

  }


}