package com.gmei

import java.io.Serializable

import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import java.io._

object temp_analysis {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '20181203'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '20181203'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.createOrReplaceTempView("agency_id")


//      //每日新用户
//      val device_id_newUser = sc.sql(
//        s"""
//           |select distinct(device_id) as device_id
//           |from online.ml_device_day_active_status
//           |where active_type != '4'
//           |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
//           |        ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
//           |        ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
//           |        ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
//           |        ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
//           |        ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
//           |        ,'promotion_shike','promotion_julang_jl03')
//           |and partition_date ='${partition_date}'
//         """.stripMargin
//      )
//      device_id_newUser.createOrReplaceTempView("device_id_new")

      val blacklist_id = sc.sql(
        s"""
           |SELECT device_id
           |from blacklist
         """.stripMargin
      )
      blacklist_id.createOrReplaceTempView("blacklist_id")

      val final_id = sc.sql(
        s"""
           |select device_id
           |from agency_id
           |UNION ALL
           |select device_id
           |from blacklist_id
         """.stripMargin
      )
      final_id.createOrReplaceTempView("final_id")


      val diary_clk_all = sc.sql(
        s"""
           |select ov.partition_date,count(ov.cl_id) as clk_num,count(distinct(ov.cl_id)),count(ov.cl_id)/count(distinct(ov.cl_id))
           |from online.tl_hdfs_maidian_view ov left join final_id
           |on ov.cl_id = final_id.device_id
           |where ov.action = "page_view"
           |and params['page_name']="diary_detail"
           |and ov.cl_id != "NULL"
           |and ov.partition_date >='20181201'
           |and final_id.device_id is  null
           |group by ov.partition_date
           |order by ov.partition_date
       """.stripMargin
      )
      diary_clk_all.show(80)


//日记本点击
      val referrer=List("about_me_message_list","all_case_service_comment","all_cases","diary_detail","diary_list"
        ,"diary_listof_related_service","answer_detail","community_home","conversation_detail","create_diary_title","diary_listof_related_service",
        "doctor_all_cases","hospital_all_cases","my_favor","my_order","order_detail","personal_store_diary_list","received_votes",
        "topic_detail","welfare_detail","welfare_list","welfare_special","wiki_detail","zone_detail",
      "expert_detail","free_activity_detail","home","message_home","my_diary","organization_detail","other_homepage","question_detail",
      "search_result_diary","search_result_more","welfare_detail","zone_v3")
      for( a <- referrer ){
        val diary_clk_temp = sc.sql(
          s"""
             |select ov.partition_date,count(ov.cl_id) as clk_num,count(distinct(ov.cl_id)),count(ov.cl_id)/count(distinct(ov.cl_id))
             |from online.tl_hdfs_maidian_view ov left join final_id
             |on ov.cl_id = final_id.device_id
             |where ov.action = "page_view"
             |and params['page_name']="diary_detail"
             |and params['referrer']='${a}'
             |and ov.cl_id != "NULL"
             |and ov.partition_date >='20181201'
             |and final_id.device_id is  null
             |group by ov.partition_date
             |order by ov.partition_date
       """.stripMargin
        )
        println("来源:",a)
        diary_clk_temp.show(80)

      }



      //5.登录人数
      val log_device_temp = sc.sql(
        s"""
           |select oe.stat_date,count(distinct(oe.device_id)) as log_num
           |from data_feed_exposure oe left join final_id
           |on oe.device_id = final_id.device_id
           |and oe.stat_date >='2018-11-01'
           |and final_id.device_id is null
           |group by oe.stat_date
           |order by oe.stat_date
         """.stripMargin
      )
      println("登录人数统计:")
      log_device_temp.show(80)

    }


  }

}






object ARPU_COM {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.createOrReplaceTempView("agency_id")

      val blacklist_id = sc.sql(
        s"""
           |SELECT device_id
           |from blacklist
         """.stripMargin
      )
      blacklist_id.createOrReplaceTempView("blacklist_id")

      val final_id = sc.sql(
        s"""
           |select device_id
           |from agency_id
           |UNION ALL
           |select device_id
           |from blacklist_id
         """.stripMargin
      )
      final_id.createOrReplaceTempView("final_id")


      val diary_clk_all = sc.sql(
        s"""
           |select sum(md.gengmei_price) as pay_all,count(distinct(md.device_id)) as consum_num
           |from online.ml_meigou_order_detail md left join final_id
           |on md.device_id = final_id.device_id
           |where md.status= 2
           |and final_id.device_id is  null
           |and md.partition_date = '20181218'
           |and  md.pay_time  is  not  null
           |and  md.pay_time  >=  '2018-11-01'
           |and md.pay_time  <=  '2018-11-30'
       """.stripMargin
      )
      diary_clk_all.show(80)


      val active_num = sc.sql(
        s"""
           |select count(distinct(device_id)) as active_num
           |from online.ml_device_month_active_status
           |where partition_date = '20181130'
           |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3','wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang','js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4','TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03')
       """.stripMargin
      )
      active_num.show(80)

    }


  }

}




object hospital_gengmei {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val hospital_gengmei = sc.sql(
        s"""
           |SELECT id,name,location,city_id
           |FROM online.tl_hdfs_hospital_view
           |WHERE partition_date = '20181219'
         """.stripMargin
      )
      hospital_gengmei.show()
      GmeiConfig.writeToJDBCTable(hospital_gengmei, "hospital_gengmei", SaveMode.Append)

    }


  }

}






object meigou_xiaofei_renshu {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")



      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.createOrReplaceTempView("agency_id")

      val blacklist_id = sc.sql(
        s"""
           |SELECT device_id
           |from blacklist
         """.stripMargin
      )
      blacklist_id.createOrReplaceTempView("blacklist_id")

      val final_id = sc.sql(
        s"""
           |select device_id
           |from agency_id
           |UNION ALL
           |select device_id
           |from blacklist_id
         """.stripMargin
      )
      final_id.createOrReplaceTempView("final_id")


//      val meigou_price = sc.sql(
//        s"""
//           |select  md.user_id,sum(md.gengmei_price) as pay_all
//           |from online.ml_meigou_order_detail md left join final_id
//           |on md.device_id = final_id.device_id
//           |where md.status= 2
//           |and final_id.device_id is  null
//           |and md.partition_date = '20181223'
//           |and  md.pay_time  is  not  null
//           |and  md.validate_time>'2017-01-01 00:00:00.0'
//           |group by md.user_id
//           |order by sum(md.gengmei_price)
//       """.stripMargin
//      )
//      meigou_price.show(80)


      val meigou_price = sc.sql(
        s"""
           |select  md.user_id,sum(md.gengmei_price) as pay_all
           |from online.ml_meigou_order_detail md
           |left join
           |(
           |        SELECT
           |        order_id
           |        FROM mining.ml_order_spam_recognize
           |        WHERE partition_date='20181223' AND
           |        self_support=0 AND dayBitsGetW1(predict_result,'20181223')=0
           |)spam
           |on md.order_id = spam.order_id
           |where md.status= 2
           |and  spam.order_id is null
           |and md.partition_date = '20181223'
           |and  md.pay_time  is  not  null
           |and  md.validate_time>'2017-01-01 00:00:00.0'
           |group by md.user_id
           |order by sum(md.gengmei_price)
       """.stripMargin
      )
//      meigou_price.show(80)

//      GmeiConfig.writeToJDBCTable(meigou_price, "meigou_price", SaveMode.Overwrite)

    }


  }

}




object smart_rank_count {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.createOrReplaceTempView("agency_id")

      val blacklist_id = sc.sql(
        s"""
           |SELECT device_id
           |from blacklist
         """.stripMargin
      )
      blacklist_id.createOrReplaceTempView("blacklist_id")

      val final_id = sc.sql(
        s"""
           |select device_id
           |from agency_id
           |UNION ALL
           |select device_id
           |from blacklist_id
         """.stripMargin
      )
      final_id.createOrReplaceTempView("final_id")


      val user_city_meigou_view = sc.sql(
        s"""
           |select ov.cl_id as device_id,ov.city_id as device_city,ov.params['business_id'] as meigou_id
           |from online.tl_hdfs_maidian_view ov left join final_id
           |on ov.cl_id = final_id.device_id
           |where ov.action = "page_view"
           |and ov.params['page_name']="welfare_detail"
           |and ov.partition_date >='20181101'
           |and ov.partition_date <'20181201'
           |and ov.city_id is not null
           |and final_id.device_id  is null
       """.stripMargin
      )
      user_city_meigou_view.createOrReplaceTempView("user_city_meigou_view")

      val meigou_city = sc.sql(
        s"""
           |select b.id as meigou_id,d.city_id as meigou_city
           |from online.tl_meigou_service_view b
           |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
           |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
           |where b.partition_date='20181228'
           |and c.partition_date='20181228'
           |and d.partition_date='20181228'
       """.stripMargin
      )
      meigou_city.createOrReplaceTempView("meigou_city")


      val meigou_pv_tongcheng = sc.sql(
        s"""
           |select a.device_id,a.device_city,a.meigou_id,b.meigou_city
           |from user_city_meigou_view a
           |left join meigou_city b
           |on a.meigou_id = b.meigou_id
       """.stripMargin
      )
      meigou_pv_tongcheng.createOrReplaceTempView("meigou_pv_tongcheng")

      val meigou_pv_count = sc.sql(
        s"""
           |select '2018-11' as stat_date,meigou_city,count(device_id) as meigou_pv,count(distinct(device_id)) as meigou_device_num
           |from meigou_pv_tongcheng
           |where device_city = meigou_city
           |group by meigou_city
       """.stripMargin
      )
      meigou_pv_count.createOrReplaceTempView("meigou_pv_count")


//开始计算咨询
      val zixun_meigou_view = sc.sql(
        s"""
           |select ov.cl_id as device_id,ov.city_id as device_city,ov.params['service_id'] as meigou_id
           |from online.tl_hdfs_maidian_view ov left join final_id
           |on ov.cl_id = final_id.device_id
           |where ov.partition_date >= '20181101'
           |and ov.partition_date < '20181201'
           |and ov.action = 'welfare_detail_click_message'
           |and final_id.device_id is null
       """.stripMargin
      )
      zixun_meigou_view.createOrReplaceTempView("zixun_meigou_view")

      val zixun_meigou_tongcheng = sc.sql(
        s"""
           |select a.device_id,a.device_city,a.meigou_id,b.meigou_city
           |from zixun_meigou_view a
           |left join meigou_city b
           |on a.meigou_id=b.meigou_id
       """.stripMargin
      )
      zixun_meigou_tongcheng.createOrReplaceTempView("zixun_meigou_tongcheng")

      val zixun_pv_count = sc.sql(
        s"""
           |select '2018-11' as stat_date,meigou_city,count(device_id) as meigou_zixun,count(distinct(device_id)) as meigou_zixun_device_num
           |from zixun_meigou_tongcheng
           |where device_city=meigou_city
           |group by meigou_city
       """.stripMargin
      )
      zixun_pv_count.createOrReplaceTempView("zixun_pv_count")


      //开始计算每个地区每月新增设备

      val device_new_count = sc.sql(
        s"""
           |select first_city,count(distinct(device_id)) as new_device_month
           |from online.ml_device_day_active_status
           |where active_type != '4'
           |and partition_date >='20181101'
           |and partition_date <'20181201'
           |group by first_city
       """.stripMargin
      )
      device_new_count.createOrReplaceTempView("device_new_count")


//将所有的数据综合一起
      val all_count = sc.sql(
        s"""
           |select mc.stat_date,mc.meigou_city,mc.meigou_pv,mc.meigou_device_num,zc.meigou_zixun,zc.meigou_zixun_device_num,dc.new_device_month
           |from meigou_pv_count mc
           |left join zixun_pv_count zc on mc.meigou_city = zc.meigou_city
           |left join device_new_count dc on dc.first_city=mc.meigou_city
       """.stripMargin
      )
      all_count.show()

      GmeiConfig.writeToJDBCTable(all_count, "smart_rank_count", SaveMode.Append)


    }


  }

}



//话题相关问题统计


object question_count {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '20190117'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '20190117'
           |AND pv_ratio >= 0.95
       """.stripMargin
        )
      agency_id.createOrReplaceTempView("agency_id")

      val question_count = sc.sql(
        s"""
           |SELECT partition_date,count(cl_id)
           |FROM online.tl_hdfs_maidian_view ov left join agency_id
           |on ov.cl_id = agency_id.device_id
           |WHERE ov.partition_date >= '20190101'
           |and ov.action='community_home_click_feed_card'
           |and ov.params["card_type"]="问题"
           |and ov.cl_id not in (select device_id from blacklist)
           |and agency_id.device_id is null
           |GROUP BY ov.partition_date
           |order by ov.partition_date
       """.stripMargin
      )

      question_count.show(30)



    }


  }

}