package com.gmei

import java.io.Serializable

import com.gmei
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.SaveMode
//import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import java.io._
import scala.util.parsing.json._


object temp_analysis {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      //      val ti = new TiContext(sc)
      sc.sql("use jerry_prod")

      import sc.implicits._
//      val stat_date = GmeiConfig.getMinusNDate(1)
      val stat_date=param.date
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '20181203'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '20181203'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.createOrReplaceTempView("agency_id")

      val blacklist_id = sc.sql(
        s"""
           |SELECT device_id
           |from blacklist
         """.stripMargin
      )
      blacklist_id.createOrReplaceTempView("blacklist_id")

      val final_id = sc.sql(
        s"""
           |select device_id
           |from agency_id
           |UNION ALL
           |select device_id
           |from blacklist_id
         """.stripMargin
      )
      final_id.createOrReplaceTempView("final_id")


      //      //每日新用户
      val device_id_newUser = sc.sql(
        s"""
           |select distinct(oms.device_id) as device_id
           |from online.ml_device_day_active_status oms left join final_id
           |on oms.device_id=final_id.device_id
           |where (oms.active_type = '2' or oms.active_type='1')
           |and oms.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |        ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |        ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |        ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |        ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |        ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |        ,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
           |and oms.partition_date ='${partition_date}'
           |and final_id.device_id is  null
         """.stripMargin
      )
      device_id_newUser.createOrReplaceTempView("device_id_new")




      val diary_clk_new = sc.sql(
        s"""
           |select ov.partition_date,ov.cl_id as device_id,ov.params['diary_id'] as diary_id
           |from online.tl_hdfs_maidian_view ov inner join device_id_new
           |on ov.cl_id = device_id_new.device_id
           |where ov.action = 'on_click_diary_card'
           |and ov.params['tab_name'] = '精选'
           |and ov.params['page_name'] = 'home'
           |and ov.partition_date='${partition_date}'
       """.stripMargin
      )
      diary_clk_new.show(80)
      GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",diary_clk_new, table="temp",SaveMode.Append)
      println("写入完成")


      }


    }


}






object ARPU_COM {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      //      val ti = new TiContext(sc)
      sc.sql("use jerry_prod")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      //      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.createOrReplaceTempView("agency_id")

      val blacklist_id = sc.sql(
        s"""
           |SELECT device_id
           |from blacklist
         """.stripMargin
      )
      blacklist_id.createOrReplaceTempView("blacklist_id")

      val final_id = sc.sql(
        s"""
           |select device_id
           |from agency_id
           |UNION ALL
           |select device_id
           |from blacklist_id
         """.stripMargin
      )
      final_id.createOrReplaceTempView("final_id")


      val diary_clk_all = sc.sql(
        s"""
           |select sum(md.gengmei_price) as pay_all,count(distinct(md.device_id)) as consum_num
           |from online.ml_meigou_order_detail md left join final_id
           |on md.device_id = final_id.device_id
           |where md.status= 2
           |and final_id.device_id is  null
           |and md.partition_date = '20181218'
           |and  md.pay_time  is  not  null
           |and  md.pay_time  >=  '2018-11-01'
           |and md.pay_time  <=  '2018-11-30'
       """.stripMargin
      )
      diary_clk_all.show(80)


      val active_num = sc.sql(
        s"""
           |select count(distinct(device_id)) as active_num
           |from online.ml_device_month_active_status
           |where partition_date = '20181130'
           |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3','wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang','js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4','TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03')
       """.stripMargin
      )
      active_num.show(80)

    }


  }

}




object hospital_gengmei {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      //      val ti = new TiContext(sc)
      sc.sql("use jerry_prod")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      //      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val hospital_gengmei = sc.sql(
        s"""
           |SELECT id,name,location,city_id
           |FROM online.tl_hdfs_hospital_view
           |WHERE partition_date = '20181219'
         """.stripMargin
      )
      hospital_gengmei.show()
      GmeiConfig.writeToJDBCTable(hospital_gengmei, "hospital_gengmei", SaveMode.Append)

    }


  }

}






object meigou_xiaofei_renshu {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      //      val ti = new TiContext(sc)
      sc.sql("use jerry_prod")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      //      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")



      import sc.implicits._
      //      val stat_date = GmeiConfig.getMinusNDate(1)
      val stat_date=param.date
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.createOrReplaceTempView("agency_id")

      val blacklist_id = sc.sql(
        s"""
           |SELECT device_id
           |from blacklist
         """.stripMargin
      )
      blacklist_id.createOrReplaceTempView("blacklist_id")

      val final_id = sc.sql(
        s"""
           |select device_id
           |from agency_id
           |UNION ALL
           |select device_id
           |from blacklist_id
         """.stripMargin
      )
      final_id.createOrReplaceTempView("final_id")


      //      val meigou_price = sc.sql(
      //        s"""
      //           |select  md.user_id,sum(md.gengmei_price) as pay_all
      //           |from online.ml_meigou_order_detail md left join final_id
      //           |on md.device_id = final_id.device_id
      //           |where md.status= 2
      //           |and final_id.device_id is  null
      //           |and md.partition_date = '20181223'
      //           |and  md.pay_time  is  not  null
      //           |and  md.validate_time>'2017-01-01 00:00:00.0'
      //           |group by md.user_id
      //           |order by sum(md.gengmei_price)
      //       """.stripMargin
      //      )
      //      meigou_price.show(80)


      val meigou_price = sc.sql(
        s"""
           |select  md.user_id,sum(md.gengmei_price) as pay_all
           |from online.ml_meigou_order_detail md
           |left join
           |(
           |        SELECT
           |        order_id
           |        FROM mining.ml_order_spam_recognize
           |        WHERE partition_date='20181223' AND
           |        self_support=0 AND dayBitsGetW1(predict_result,'20181223')=0
           |)spam
           |on md.order_id = spam.order_id
           |where md.status= 2
           |and  spam.order_id is null
           |and md.partition_date = '20181223'
           |and  md.pay_time  is  not  null
           |and  md.validate_time>'2017-01-01 00:00:00.0'
           |group by md.user_id
           |order by sum(md.gengmei_price)
       """.stripMargin
      )
      //      meigou_price.show(80)

      //      GmeiConfig.writeToJDBCTable(meigou_price, "meigou_price", SaveMode.Overwrite)

    }


  }

}




object alpha_ctr {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      //      val ti = new TiContext(sc)
      sc.sql("use jerry_prod")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      //      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
//      val stat_date = param.date
      //println(param.date)
      val partition_date = stat_date.replace("-","")

      val click_count_recommend = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(*) as click_count_recommend
           |from bl.bl_alpha_et_mg_maidianlog_inc_d
           |where params['tab_name']='recommend'
           |and params['page_name']='home'
           |and type='on_click_feed_topic_card'
           |and partition_day='${partition_date}'
       """.stripMargin
      )
      click_count_recommend.show()

      val click_count_focus = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(*) as click_count_focus
           |from bl.bl_alpha_et_mg_maidianlog_inc_d
           |where params['tab_name']='focus'
           |and params['page_name']='home'
           |and type='on_click_feed_topic_card'
           |and partition_day='${partition_date}'
       """.stripMargin
      )
      click_count_focus.show()


      def parse_json(str:String): Int ={
        var t = List[Map[String, Any]]()
        val result = JSON.parseFull(str)
        result match {
          case Some(b: List[Map[String, Any]]) => t = t ++ b
          case None => println("Parsing failed")
          case other => println("Unknown data structure: " + other)
        }
        t.size

      }

      val expoure_cards=sc.sql(
        s"""
           |select params['exposure_cards'] as exposure_cards
           |from bl.bl_alpha_et_mg_maidianlog_inc_d
           |where params['tab_name'] = 'recommend'
           |and params['page_name'] = 'home'
           |and type = 'page_precise_exposure'
           |and partition_day='${partition_date}'
       """.stripMargin
      )
      val a =expoure_cards.rdd.map(row => row(0).toString).map(row=>parse_json(row)).collect().sum
      val result1=List((stat_date,a))
      val df1 = sc.createDataFrame(result1).toDF("stat_date","expoure_count_recommend")

      val expoure_cards2=sc.sql(
        s"""
           |select params['exposure_cards'] as exposure_cards
           |from bl.bl_alpha_et_mg_maidianlog_inc_d
           |where params['tab_name'] = 'focus'
           |and params['page_name'] = 'home'
           |and type = 'page_precise_exposure'
           |and partition_day='${partition_date}'
       """.stripMargin
      )
      val b =expoure_cards2.rdd.map(row => row(0).toString).map(row=>parse_json(row)).collect().sum
      val result2=List((stat_date,b))
      val df2 = sc.createDataFrame(result2).toDF("stat_date","expoure_count_focus")



      val result=click_count_recommend.join(click_count_focus,"stat_date")
        .join(df1,"stat_date")
        .join(df2,"stat_date")



//      GmeiConfig.writeToJDBCTable(result, "alpha_ctr", SaveMode.Append)
//      GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="alpha_ctr",SaveMode.Append)

      println("开始写入")
//      GmeiConfig.writeToJDBCTable("jerry.jdbcuri",result, table="alpha_ctr",SaveMode.Append)

      //      GmeiConfig.writeToJDBCTable(result, "alpha_ctr", SaveMode.Append)
      GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="alpha_ctr",SaveMode.Append)
      println("写入完成")



      val device_num_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(DISTINCT(device_id)) as device_num
           |from ml.ML_ALPHA_C_CT_DV_DEVICE_DIMEN_D
           |where partition_day='${partition_date}'
           |and is_today_active='true'
       """.stripMargin
      )
      device_num_count.show()

      val duration_device=sc.sql(
        s"""
           |select '${stat_date}' as stat_date,sum(user_duration)/count(DISTINCT(device_id)) as device_duration
           |from ml.ML_ALPHA_C_CT_DV_DEVICE_INDIC_INC_D
           |WHERE partition_day='${partition_date}'
           |and open_times!="0"
       """.stripMargin
      )

      val result3=device_num_count.join(duration_device,"stat_date")

//      GmeiConfig.writeToJDBCTable(result3, "alpha_duration", SaveMode.Append)
      GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result3, table="alpha_duration",SaveMode.Append)
      println("写入完成")

    }


  }

}



//话题相关问题统计


object copy_database {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      //      val ti = new TiContext(sc)
      sc.sql("use jerry_prod")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      //      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      //      ti.tidbMapTable(dbName = "jerry_test", tableName = "tl_hdfs_wiki_item_tag_view")
      //      ti.tidbMapTable(dbName = "jerry_test", tableName = "Knowledge_network")
      //      ti.tidbMapTable(dbName = "eagle", tableName = "src_mimas_prod_api_diary")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      //      val stat_date=param.date
      val partition_date = stat_date.replace("-","")

      val new_data = sc.sql(
        s"""
           |select d.level2_id,d.level2_name,c.item_id,c.tag_id,c.id,c.name,c.treatment_method,c.price_min,c.price_max,c.treatment_time,c.maintain_time,c.recover_time
           |from online.bl_tag_hierarchy_detail d
           |inner join
           |(select a.item_id,a.tag_id,b.id,b.name,b.treatment_method,b.price_min,b.price_max,b.treatment_time,b.maintain_time,b.recover_time
           |from online.tl_hdfs_wiki_item_tag_view a
           |inner join Knowledge_network b
           |on a.item_id=b.id
           |where a.partition_date='${partition_date}') c
           |on d.id=c.tag_id
           |where d.partition_date='${partition_date}'
       """.stripMargin
      )

      GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",new_data, "train_Knowledge_network_data", SaveMode.Overwrite)


    }


  }

}