package com.gmei

import java.io.Serializable

import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import com.gmei.GmeiConfig.{writeToJDBCTable,getMinusNDate}

object testt {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")

      val stat_date = GmeiConfig.getMinusNDate(1)
//      val stat_date=param.date
      val partition_date = stat_date.replace("-","")
      //机构id
      val blacklist = sc.sql(
        s"""
           |select device_id from blacklist
         """.stripMargin
      )
      blacklist.createOrReplaceTempView("blacklist")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.show()
      agency_id.createOrReplaceTempView("agency_id")
//每日新用户
      val device_id_newUser = sc.sql(
        s"""
           |select distinct(device_id) as device_id
           |from online.ml_device_day_active_status
           |where active_type != '4'
           |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |        ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |        ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |        ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |        ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |        ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |        ,'promotion_shike','promotion_julang_jl03')
           |and partition_date ='${partition_date}'
         """.stripMargin
      )
      device_id_newUser.show()
      device_id_newUser.createOrReplaceTempView("device_id_new")

      //每日老用户
      val device_id_oldUser = sc.sql(
        s"""
           |select distinct(device_id) as device_id
           |from online.ml_device_day_active_status
           |where active_type = '4'
           |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |        ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |        ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |        ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |        ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |        ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |        ,'promotion_shike','promotion_julang_jl03')
           |and partition_date ='${partition_date}'
         """.stripMargin
      )
      device_id_oldUser.show()
      device_id_oldUser.createOrReplaceTempView("device_id_old")


      //日记本转化美购
      //1.日记本到美购转化数
      val diary_meigou_temp = sc.sql(
        s"""
           |select ou.cl_id as device_id
           |from online.bl_hdfs_page_view_updates ou left join agency_id
           |on ou.cl_id = agency_id.device_id
           |where ou.partition_date = '${partition_date}'
           |and ou.page_name='welfare_detail'
           |and ou.referrer='diary_detail'
           |and agency_id.device_id is  null
         """.stripMargin
      )
      diary_meigou_temp.createOrReplaceTempView("diary_meigou_temp")
      val diary_meigou_device = sc.sql(
        s"""
           |select dt.device_id
           |from diary_meigou_temp dt left join blacklist
           |on dt.device_id = blacklist.device_id
           |where blacklist.device_id is null
         """.stripMargin
      )
      diary_meigou_device.createOrReplaceTempView("diary_meigou_device")
//新用户到美购详情页的转化
      val diary_meigou_newUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(dd.device_id) as diary_meigou_newUser
           |from diary_meigou_device dd left join device_id_new
           |on dd.device_id = device_id_new.device_id
           |where device_id_new.device_id is not null
         """.stripMargin
      )
      //老用户到美购详情页的转化
      val diary_meigou_oldUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(dd.device_id) as diary_meigou_oldUser
           |from diary_meigou_device dd left join device_id_old
           |on dd.device_id = device_id_old.device_id
           |where device_id_old.device_id is not null
         """.stripMargin
      )

//      val diary_meigou_count = sc.sql(
//        s"""
//           |select '${stat_date}' as stat_date, count(page_name) as diary_meigou_count
//           |from online.bl_hdfs_page_view_updates ou left join agency_id
//           |on ou.cl_id = agency_id.device_id
//           |where ou.partition_date = '${partition_date}'
//           |and ou.page_name='welfare_detail'
//           |and ou.referrer='diary_detail'
//           |and agency_id.device_id is  null
//           |and ou.cl_id not in (select device_id from blacklist)
//         """.stripMargin
//      )
//      diary_meigou_count.show()

      //2.日记本点击数
      val diary_clk_temp = sc.sql(
        s"""
           |select ov.cl_id as device_id
           |from online.tl_hdfs_maidian_view ov left join agency_id
           |on ov.cl_id = agency_id.device_id
           |where ov.action = 'on_click_diary_card'
           |and ov.cl_id != "NULL"
           |and ov.partition_date='${partition_date}'
           |and agency_id.device_id is  null
       """.stripMargin
      )
      diary_clk_temp.createOrReplaceTempView("diary_clk_temp")

      val diary_clk_device = sc.sql(
        s"""
           |select dt.device_id
           |from diary_clk_temp dt left join blacklist
           |on dt.device_id = blacklist.device_id
           |where blacklist.device_id is  null
         """.stripMargin
      )

      diary_clk_device.createOrReplaceTempView("diary_clk_device")

      //新用户日记本点击
      val diary_clk_newUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(dd.device_id) as diary_clk_newUser
           |from diary_clk_device dd left join device_id_new
           |on dd.device_id = device_id_new.device_id
           |where device_id_new.device_id is not null
         """.stripMargin
      )
      //老用户日记本点击
      val diary_clk_oldUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(dd.device_id) as diary_clk_oldUser
           |from diary_clk_device dd left join device_id_old
           |on dd.device_id = device_id_old.device_id
           |where device_id_old.device_id is not null
         """.stripMargin
      )
//      val diary_clk = sc.sql(
//        s"""
//           |select '${stat_date}' as stat_date,count(cl_id) as diary_clk
//           |from online.tl_hdfs_maidian_view ov left join agency_id
//           |on ov.cl_id = agency_id.device_id
//           |where ov.action = 'on_click_diary_card'
//           |and ov.cl_id != "NULL"
//           |and ov.partition_date='${partition_date}'
//           |and agency_id.device_id is  null
//           |and ov.cl_id not in (select device_id from blacklist)
//       """.stripMargin
//      )
//      diary_clk.show()

      //3.日记本曝光数
      val diary_expoure_temp=sc.sql(
        s"""
           |select od.cl_id as device_id
           |from online.ml_community_exposure_detail od left join agency_id
           |on od.cl_id = agency_id.device_id
           |where od.business_type = "diary"
           |and od.cl_id != "NULL"
           |and od.partition_date='${partition_date}'
           |and agency_id.device_id is  null
       """.stripMargin
      )
      diary_expoure_temp.createOrReplaceTempView("diary_expoure_temp")

      val diary_expoure_device = sc.sql(
        s"""
           |select dt.device_id
           |from diary_expoure_temp dt left join blacklist
           |on dt.device_id = blacklist.device_id
           |where blacklist.device_id is  null
         """.stripMargin
      )

      diary_expoure_device.createOrReplaceTempView("diary_expoure_device")
      //新用户日记本曝光
      val diary_exp_newUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(dd.device_id) as diary_exp_newUser
           |from diary_expoure_device dd left join device_id_new
           |on dd.device_id = device_id_new.device_id
           |where device_id_new.device_id is not null
         """.stripMargin
      )
      //老用户日记本曝光
      val diary_exp_oldUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(dd.device_id) as diary_exp_oldUser
           |from diary_expoure_device dd left join device_id_old
           |on dd.device_id = device_id_old.device_id
           |where device_id_old.device_id is not null
         """.stripMargin
      )

//      val diary_expoure=sc.sql(
//        s"""
//           |select '${stat_date}' as stat_date,count(cl_id) as diary_expoure
//           |from online.ml_community_exposure_detail od left join agency_id
//           |on od.cl_id = agency_id.device_id
//           |where od.business_type = "diary"
//           |and od.cl_id != "NULL"
//           |and od.partition_date='${partition_date}'
//           |and agency_id.device_id is  null
//           |and od.cl_id not in (select device_id from blacklist)
//       """.stripMargin
//      )
//      diary_expoure.show()

      //4.搜索次数统计
      val search_device_temp = sc.sql(
        s"""
           |select ov.cl_id as device_id
           |from online.tl_hdfs_maidian_view ov left join agency_id
           |on ov.cl_id = agency_id.device_id
           |where (ov.action = 'do_search' or ov.action = 'search_result_click_search')
           |and ov.partition_date ='${partition_date}'
           |and agency_id.device_id is  null
         """.stripMargin
      )
      search_device_temp.createOrReplaceTempView("search_device_temp")

      val search_device = sc.sql(
        s"""
           |select dt.device_id
           |from search_device_temp dt left join blacklist
           |on dt.device_id = blacklist.device_id
           |where blacklist.device_id is null
         """.stripMargin
      )
      search_device.createOrReplaceTempView("search_device")
      //新用户搜索次数
      val search_newUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(sd.device_id) as search_newUser
           |from search_device sd left join device_id_new
           |on sd.device_id = device_id_new.device_id
           |where device_id_new.device_id is not null
         """.stripMargin
      )
      //老用户日搜索次数
      val search_oldUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(sd.device_id) as search_oldUser
           |from search_device sd left join device_id_old
           |on sd.device_id = device_id_old.device_id
           |where device_id_old.device_id is not null
         """.stripMargin
      )

      //5.登录人数
      val log_device_temp = sc.sql(
        s"""
           |select distinct(oe.device_id)  as device_id
           |from data_feed_exposure oe left join agency_id
           |on oe.device_id = agency_id.device_id
           |and oe.stat_date ='${stat_date}'
           |and agency_id.device_id is  null
         """.stripMargin
      )
      log_device_temp.createOrReplaceTempView("log_device_temp")

      val log_device = sc.sql(
        s"""
           |select dt.device_id
           |from log_device_temp dt left join blacklist
           |on dt.device_id = blacklist.device_id
           |where blacklist.device_id is  null
         """.stripMargin
      )
      log_device.createOrReplaceTempView("log_device")

      //新用户登录人数
      val log_newUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(distinct(ld.device_id)) as log_newUser
           |from log_device ld left join device_id_new
           |on ld.device_id = device_id_new.device_id
           |where device_id_new.device_id is not null
         """.stripMargin
      )
      //老用户登录人数
      val log_oldUser = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(distinct(ld.device_id)) as log_oldUser
           |from log_device ld left join device_id_old
           |on ld.device_id = device_id_old.device_id
           |where device_id_old.device_id is not null
         """.stripMargin
      )
//      val log_num = sc.sql(
//        s"""
//           |select '${stat_date}' as stat_date,count(distinct(device_id)) as log_num
//           |from data_feed_exposure oe left join agency_id
//           |on oe.device_id = agency_id.device_id
//           |and oe.stat_date ='${stat_date}'
//           |and agency_id.device_id is  null
//           |and oe.device_id not in (select device_id from blacklist)
//         """.stripMargin
//      )
//      log_num.show()

      val result = diary_meigou_newUser.join(diary_meigou_oldUser,"stat_date")
        .join(diary_clk_newUser,"stat_date")
        .join(diary_clk_oldUser,"stat_date")
        .join(diary_exp_newUser,"stat_date")
        .join(diary_exp_oldUser,"stat_date")
        .join(search_newUser,"stat_date")
        .join(search_oldUser,"stat_date")
        .join(log_newUser,"stat_date")
        .join(log_oldUser,"stat_date")

      GmeiConfig.writeToJDBCTable(result, "diary_meigou_crv", SaveMode.Append)


      }

    }


}