package com.gmei import java.io.Serializable import java.text.SimpleDateFormat import breeze.linalg.split import com.gmei.WeafareStat.{defaultParams, parser} import org.apache.spark.sql.{Row, SaveMode, SparkSession, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams import com.github.nscala_time.time.Imports._ import java.text.SimpleDateFormat import java.util.Date import scala.util.parsing.json.JSON object temp_count { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = param.date //println(param.date) val partition_date = stat_date.replace("-","") val decive_id_oldUser = sc.sql( s""" |select distinct(device_id) as device_id |from online.ml_device_day_active_status |where active_type = '4' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown') |and partition_date ='${partition_date}' """.stripMargin ) decive_id_oldUser.createOrReplaceTempView("device_id_old") val clk_count_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(jd.cid_id) as clk_count_oldUser |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id not in (select device_id from blacklist) |and jd.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and je.stat_date ='${stat_date}' """.stripMargin ) val clk_count_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_all |from data_feed_click |where (cid_type = 'diary' or cid_type = 'diary_video') |and device_id not in (select device_id from blacklist) |and city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and stat_date ='${stat_date}' """.stripMargin ) val imp_count__all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_all |from data_feed_exposure |where cid_type = 'diary' |and device_id not in (select device_id from blacklist) |and city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and stat_date ='${stat_date}' """.stripMargin ) val clk_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_oldUser_Contrast |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and je.stat_date ='${stat_date}' """.stripMargin ) val result1 = clk_count_oldUser.join(imp_count_oldUser,"stat_date") .join(clk_count_all,"stat_date") .join(imp_count__all,"stat_date") .join(clk_count_oldUser_Contrast,"stat_date") .join(imp_count_oldUser_Contrast,"stat_date") result1.show() GmeiConfig.writeToJDBCTable(result1, "ffm_diary_ctr", SaveMode.Append) } } } object Repeated_content_recommendation { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = param.date val partition_date = stat_date.replace("-","") val exp_diary = sc.sql( s""" |select concat_ws('|',device_id,cid_id) |from data_feed_exposure |where cid_type = 'diary' |and device_id not in (select device_id from blacklist) |and stat_date ='${stat_date}' """.stripMargin ) exp_diary.show() val get_result =exp_diary.rdd.map((_, 1)).reduceByKey(_ + _) .sortBy(_._2,false) val more_than2=get_result.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y) println(more_than2) val all =get_result.map(_._2).reduce((x,y)=>x+y) println(all) val repeated_rate= more_than2 / all.toDouble println(repeated_rate) val test=List((stat_date,repeated_rate)) val df = sc.createDataFrame(test) GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator", SaveMode.Append) // val temp=get_result.collect() // for (i <- 0 until 30 ) { // println(temp(i)) // } } } } object Repeated_content_recommendation_moreday { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") // val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = "2019-01-16" // val partition_date = stat_date.replace("-","") val now= new Date() val dateFormat = new SimpleDateFormat("yyyy-MM-dd") val date = dateFormat.format(now.getTime - 86400000L * 15) val yesterday=dateFormat.format(now.getTime- 86400000L) val exp_diary = sc.sql( s""" |select stat_date,device_id,concat_ws(',',collect_set(distinct cid_id)) as expoure_diary |from data_feed_exposure_precise |where cid_type = 'diary' |and stat_date >='${date}' |and device_id not in (select device_id from blacklist) |group by device_id,stat_date """.stripMargin ).rdd.map(row=>(row(0).toString,row(1).toString,row(2).toString)).map(row=>(row._2,row._3)).groupByKey() .filter(x => x._2.size >1) //打印结果 // val temp=exp_diary.take(10).foreach(println) // val count_imp=exp_diary.map(_._2).map(row=>row.flatMap(x=>x.split(",")).toArray) // .map(x => (x,x)).map(x => (x._1.distinct.size,x._2.size)).map(x => (x._2-x._1,x._2)) //统计每个用户重复日记个数 val count_imp=exp_diary.map(_._2).map(row=>row.flatMap(x=>x.split(",")).toArray) .map(x => (x,x)).map(x => (x._1.distinct.size,x._2.size)).map(x => (x._2-x._1,x._2)).collect() val fenmu = count_imp.map(x => x._1).reduce((x,y) => x+y) val fenzi = count_imp.map(x => x._2).reduce((x,y) => x+y) val repeated_rate= fenmu / fenzi.toDouble val result=List((yesterday,repeated_rate)) val df_result = sc.createDataFrame(result) GmeiConfig.writeToJDBCTable(df_result, table = "Repeated_content_recommendation_moreday", SaveMode.Append) // exp_diary.show() // exp_diary.createOrReplaceTempView("exp_diary") // GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator_moreday", SaveMode.Append) } } } object GetHiveSearchData { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable case class GetHiveSearchData_temp(stat_date:String,diaryExposureVal:String,diaryClickNum:String,meigouExposureVal:String,meigouClickNum:String,searchResultExposureVal:String,searchResultClickNum:String,searchDoctorExposureVal:String,searchDoctorClickNum:String,searchHospitalExposureVal:String,searchHospitalClickNum:String) val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") ti.tidbMapTable(dbName = "jerry_prod", tableName = "GetHiveSearchData_CTR") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = param.date val partition_date = stat_date.replace("-","") val strDiaryExposureAction = "/api/search/v2/diary" val strDiaryClickAction = "search_result_more_diary_click_item" //需要确认 var (diaryExposureVal,diaryClickNum,diaryExposureMapCount,diaryExposureFilterCount) = GetSearchResultData(sc,strDiaryExposureAction,strDiaryClickAction,stat_date) val strMeigouExposureAction = "/api/search/v2/service" val strMeigouClickAction = "search_result_welfare_click_item" var (meigouExposureVal,meigouClickNum,meigouExposureMapCount,meigouExposureFilterCount) = GetSearchResultData(sc,strMeigouExposureAction,strMeigouClickAction,stat_date) val strSearchResultExposureAction = "/api/search/v2/content" val strSearchResultClickAction = "search_result_click_diary_item" //需要确认 var (searchResultExposureVal,searchResultClickNum,searchResultExposureMapCount,searchResultExposureFilterCount) = GetSearchResultData(sc,strSearchResultExposureAction,strSearchResultClickAction,stat_date) val strSearchDoctorExposureAction = "/api/search/v2/doctor" val strSearchDoctorClickAction = "search_result_doctor_click_item" var (searchDoctorExposureVal,searchDoctorClickNum,searchDoctorExposureMapCount,searchDoctorExposureFilterCount) = GetSearchResultData(sc,strSearchDoctorExposureAction,strSearchDoctorClickAction,stat_date) val strSearchHospitalExposureAction = "/api/search/v2/hospital" val strSearchHospitalClickAction = "search_result_hospital_click_item" var (searchHospitalExposureVal,searchHospitalClickNum,searchHospitalExposureMapCount,searchHospitalExposureFilterCount) = GetSearchResultData(sc,strSearchHospitalExposureAction,strSearchHospitalClickAction,stat_date) val jigou_id = sc.sql( s""" |SELECT cl_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date>='20180402' AND partition_date<'${partition_date}' |AND pv_ratio>=0.95 |UNION ALL |SELECT cl_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date>='20171101' AND partition_date<'${partition_date}' |AND pv_ratio>=0.95 |UNION ALL |select device_id as cl_id from blacklist """.stripMargin ) jigou_id.createOrReplaceTempView("jigou_id") val diary_clickSql = sc.sql( s""" |select |count(1) click_num |from online.tl_hdfs_maidian_view ov left join jigou_id |on ov.cl_id = jigou_id.cl_id |where ov.partition_date='${partition_date}' |and ov.action='on_click_diary_card' |and ov.params['page_name']='search_result_diary' |and jigou_id.cl_id is null """.stripMargin ) val diary_clickArray = diary_clickSql.collect() val diary_click_num = diary_clickArray(0).getAs[Long]("click_num") val content_diary_clickSql = sc.sql( s""" |select |count(1) click_num |from online.tl_hdfs_maidian_view ov left join jigou_id |on ov.cl_id = jigou_id.cl_id |where ov.partition_date='${partition_date}' |and ov.action='on_click_diary_card' |and ov.params['page_name']='search_result_more' |and jigou_id.cl_id is null """.stripMargin ) val content_diary_clickArray:Array[Row] = content_diary_clickSql.collect() val content_diary_click_num:Long = content_diary_clickArray(0).getAs[Long]("click_num") println("searchDiaryExposureVal:" + diaryExposureVal + "\tsearchDiaryClickNum:" + diary_click_num + "\tclickRate:" + (diary_click_num.floatValue()/diaryExposureVal.floatValue()).formatted("%.2f")) println("searchMeigouExposureVal:" + meigouExposureVal + "\tsearchMeigouClickNum:" + meigouClickNum + "\tclickRate:" + (meigouClickNum.floatValue()/meigouExposureVal.floatValue()).formatted("%.2f")) println("searchResultExposureVal:" + searchResultExposureVal + "\tsearchResultClickNum:" + (searchResultClickNum+content_diary_click_num) + "\tclickRate:" + ((searchResultClickNum+content_diary_click_num).floatValue()/searchResultExposureVal.floatValue()).formatted("%.2f")) println("searchDoctorExposureVal:" + searchDoctorExposureVal + "\tsearchDoctorClickNum:" + searchDoctorClickNum + "\tclickRate:" + (searchDoctorClickNum.floatValue()/searchDoctorExposureVal.floatValue()).formatted("%.2f")) println("searchHospitalExposureVal:" + searchHospitalExposureVal + "\tsearchHospitalClickNum:" + searchHospitalClickNum + "\tclickRate:" + (searchHospitalClickNum.floatValue()/searchHospitalExposureVal.floatValue()).formatted("%.2f")) // val add_data = sc.sql( // s""" // |insert into table GetHiveSearchData_CTR(stat_date,diaryExposureVal,diaryClickNum,meigouExposureVal,meigouClickNum,searchResultExposureVal,searchResultClickNum,searchDoctorExposureVal,searchDoctorClickNum,searchHospitalExposureVal,searchHospitalClickNum) // |values ('${stat_date}','${diaryExposureVal}','${(diary_click_num+diaryClickNum)}','${meigouExposureVal}','${meigouClickNum}','${searchResultExposureVal}','${(searchResultClickNum+content_diary_click_num)}','${searchDoctorExposureVal}','${searchDoctorClickNum}','${searchHospitalExposureVal}','${searchHospitalClickNum}') // """.stripMargin // ) import sc.implicits._ val result=List((stat_date,diaryExposureVal,(diary_click_num+diaryClickNum),meigouExposureVal,meigouClickNum,searchResultExposureVal,(searchResultClickNum+content_diary_click_num),searchDoctorExposureVal,searchDoctorClickNum,searchHospitalExposureVal,searchHospitalClickNum)) // val df_result = sc.createDataFrame(result).map(x=>GetHiveSearchData_temp(x(0).toString,x(1).toString,x(2).toString,x(3).toString,x(4).toString,x(5).toString,x(6).toString,x(7).toString,x(8).toString,x(9).toString,x(10).toString)).toDF() val df_result = sc.sparkContext.parallelize(result).map(x=>(x._1,x._2,x._3,x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11)).toDF("stat_date","diaryExposureVal","diaryClickNum","meigouExposureVal","meigouClickNum","searchResultExposureVal","searchResultClickNum","searchDoctorExposureVal", "searchDoctorClickNum","searchHospitalExposureVal","searchHospitalClickNum") // GmeiConfig.writeToJDBCTable(df_result, table = "GetHiveSearchData_CTR", SaveMode.Append) } def GetSearchResultData(spark: SparkSession, strExposureAction:String, strClickAction:String,stat_date:String) = { val partition_date = stat_date.replace("-","") val exposureAccum = spark.sparkContext.longAccumulator("search exposure data") val jigou_id = spark.sql( s""" |SELECT cl_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date>='20180402' AND partition_date<'${partition_date}' |AND pv_ratio>=0.95 |UNION ALL |SELECT cl_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date>='20171101' AND partition_date<'${partition_date}' |AND pv_ratio>=0.95 |UNION ALL |select device_id as cl_id from blacklist """.stripMargin ) jigou_id.createOrReplaceTempView("jigou_id") val exposureSql = spark.sql( s""" |select action,user_id,city_id,app |from online.tl_hdfs_backend_view ov left join jigou_id |on ov.cl_id = jigou_id.cl_id |where ov.action='$strExposureAction' |and ov.partition_date='${partition_date}' |and jigou_id.cl_id is null """.stripMargin ) val exposureMapResult = exposureSql.rdd.map(row => { //val jsonObj = JSON.parseFull(row.getAs[Map[String,Any]]("app").toString()) val rowAppFieldMap:Map[String,Any] = row.getAs[Map[String,Any]]("app") if (rowAppFieldMap.nonEmpty) { // jsonMap:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String,Any]] if (rowAppFieldMap.contains("exposure_data")){ val exposure_data_lists:List[Any] = JSON.parseFull(rowAppFieldMap("exposure_data").toString).get.asInstanceOf[List[Any]] if (exposure_data_lists.length > 0){ exposure_data_lists.foreach(exposure_item=>{ if (exposure_item!=None && exposure_item.toString.nonEmpty){ val exposureItemMap:Map[String,Any] = exposure_item.asInstanceOf[Map[String,Any]] if (exposureItemMap.contains("list_ids")){ //val exposure_list_ids:List[Any] = exposureItemMap.get("list_ids").get.asInstanceOf[List[Any]] val exposure_list_ids:List[Any] = exposureItemMap("list_ids").asInstanceOf[List[Any]] exposureAccum.add(exposure_list_ids.length) } }else{ None } }) exposure_data_lists }else{ None } } }else{ None } }) //must add cache exposureMapResult.cache() val exposureFilterResult = exposureMapResult.filter(_.!=(None)) //val exposureArray:Array[Any] = exposureFilterResult.collect() //exposureArray.foreach(item => println(item.toString)) val exposureMapCount:Long = exposureMapResult.count() val exposureFilterCount:Long = exposureFilterResult.count() val clickSql = spark.sql( s""" |select |count(1) click_num |from online.tl_hdfs_maidian_view ov left join jigou_id |on ov.cl_id = jigou_id.cl_id |where ov.partition_date='${partition_date}' |and ov.action='$strClickAction' |and jigou_id.cl_id is null """.stripMargin ) val clickArray:Array[Row] = clickSql.collect() val click_num:Long = clickArray(0).getAs[Long]("click_num") (exposureAccum.value,click_num,exposureMapCount,exposureFilterCount) } // GmeiConfig.writeToJDBCTable(df_result, table = "Repeated_content_recommendation_moreday", SaveMode.Append) // exp_diary.show() // exp_diary.createOrReplaceTempView("exp_diary") // GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator_moreday", SaveMode.Append) } }