package com.gmei import java.io.Serializable import com.gmei.WeafareStat.{defaultParams, parser} //import org.apache.spark.sql.{SaveMode, TiContext} import org.apache.spark.sql.{SaveMode} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams object testt { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date=param.date val partition_date = stat_date.replace("-","") //机构id val blacklist = sc.sql( s""" |select device_id from blacklist """.stripMargin ) blacklist.createOrReplaceTempView("blacklist") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.show() agency_id.createOrReplaceTempView("agency_id") //每日新用户 val device_id_newUser = sc.sql( s""" |select distinct(os.device_id) as device_id |from online.ml_device_day_active_status os left join blacklist |on os.device_id=blacklist.device_id |where (os.active_type = '1' or os.active_type='2') |and os.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei') |and os.partition_date ='${partition_date}' |and blacklist.device_id is null """.stripMargin ) device_id_newUser.show() device_id_newUser.createOrReplaceTempView("device_id_new") //每日老用户 val device_id_oldUser = sc.sql( s""" |select distinct(os.device_id) as device_id |from online.ml_device_day_active_status os left join blacklist |on os.device_id=blacklist.device_id |where os.active_type = '4' |and os.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei') |and os.partition_date ='${partition_date}' |and blacklist.device_id is null """.stripMargin ) device_id_oldUser.show() device_id_oldUser.createOrReplaceTempView("device_id_old") //日记本转化美购 //1.日记本到美购转化数 val diary_meigou_temp = sc.sql( s""" |select ou.cl_id as device_id |from online.bl_hdfs_page_view_updates ou left join agency_id |on ou.cl_id = agency_id.device_id |where ou.partition_date = '${partition_date}' |and ou.page_name='welfare_detail' |and ou.referrer='diary_detail' |and agency_id.device_id is null """.stripMargin ) diary_meigou_temp.createOrReplaceTempView("diary_meigou_temp") diary_meigou_temp.show() val diary_meigou_device = sc.sql( s""" |select dt.device_id |from diary_meigou_temp dt left join blacklist |on dt.device_id = blacklist.device_id |where blacklist.device_id is null """.stripMargin ) diary_meigou_device.createOrReplaceTempView("diary_meigou_device") diary_meigou_device.show() //新用户到美购详情页的转化 val diary_meigou_newUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(dd.device_id) as diary_meigou_newUser |from diary_meigou_device dd left join device_id_new |on dd.device_id = device_id_new.device_id |where device_id_new.device_id is not null """.stripMargin ) diary_meigou_newUser.show() //老用户到美购详情页的转化 val diary_meigou_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(dd.device_id) as diary_meigou_oldUser |from diary_meigou_device dd left join device_id_old |on dd.device_id = device_id_old.device_id |where device_id_old.device_id is not null """.stripMargin ) diary_meigou_oldUser.show() //2.日记本点击数 val diary_clk_temp = sc.sql( s""" |select ov.cl_id as device_id |from online.tl_hdfs_maidian_view ov left join agency_id |on ov.cl_id = agency_id.device_id |where ov.action = 'on_click_diary_card' |and ov.cl_id != "NULL" |and ov.partition_date='${partition_date}' |and agency_id.device_id is null """.stripMargin ) diary_clk_temp.createOrReplaceTempView("diary_clk_temp") diary_clk_temp.show() val diary_clk_device = sc.sql( s""" |select dt.device_id |from diary_clk_temp dt left join blacklist |on dt.device_id = blacklist.device_id |where blacklist.device_id is null """.stripMargin ) diary_clk_device.createOrReplaceTempView("diary_clk_device") diary_clk_device.show() //新用户日记本点击 val diary_clk_newUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(dd.device_id) as diary_clk_newUser |from diary_clk_device dd left join device_id_new |on dd.device_id = device_id_new.device_id |where device_id_new.device_id is not null """.stripMargin ) diary_clk_newUser.show() //老用户日记本点击 val diary_clk_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(dd.device_id) as diary_clk_oldUser |from diary_clk_device dd left join device_id_old |on dd.device_id = device_id_old.device_id |where device_id_old.device_id is not null """.stripMargin ) diary_clk_oldUser.show() //3.日记本曝光数 val diary_expoure_temp=sc.sql( s""" |select od.cl_id as device_id |from online.ml_community_exposure_detail od left join agency_id |on od.cl_id = agency_id.device_id |where od.business_type = "diary" |and od.cl_id != "NULL" |and od.partition_date='${partition_date}' |and agency_id.device_id is null """.stripMargin ) diary_expoure_temp.createOrReplaceTempView("diary_expoure_temp") diary_expoure_temp.show() val diary_expoure_device = sc.sql( s""" |select dt.device_id |from diary_expoure_temp dt left join blacklist |on dt.device_id = blacklist.device_id |where blacklist.device_id is null """.stripMargin ) diary_expoure_device.show() diary_expoure_device.createOrReplaceTempView("diary_expoure_device") //新用户日记本曝光 val diary_exp_newUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(dd.device_id) as diary_exp_newUser |from diary_expoure_device dd left join device_id_new |on dd.device_id = device_id_new.device_id |where device_id_new.device_id is not null """.stripMargin ) diary_exp_newUser.show() //老用户日记本曝光 val diary_exp_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(dd.device_id) as diary_exp_oldUser |from diary_expoure_device dd left join device_id_old |on dd.device_id = device_id_old.device_id |where device_id_old.device_id is not null """.stripMargin ) diary_exp_oldUser.show() //4.搜索次数统计 val search_device_temp = sc.sql( s""" |select ov.cl_id as device_id |from online.tl_hdfs_maidian_view ov left join agency_id |on ov.cl_id = agency_id.device_id |where (ov.action = 'do_search' or ov.action = 'search_result_click_search') |and ov.partition_date ='${partition_date}' |and agency_id.device_id is null """.stripMargin ) search_device_temp.createOrReplaceTempView("search_device_temp") search_device_temp.show() val search_device = sc.sql( s""" |select dt.device_id |from search_device_temp dt left join blacklist |on dt.device_id = blacklist.device_id |where blacklist.device_id is null """.stripMargin ) search_device.createOrReplaceTempView("search_device") search_device.show() //新用户搜索次数 val search_newUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(sd.device_id) as search_newUser |from search_device sd left join device_id_new |on sd.device_id = device_id_new.device_id |where device_id_new.device_id is not null """.stripMargin ) search_newUser.show() //老用户日搜索次数 val search_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(sd.device_id) as search_oldUser |from search_device sd left join device_id_old |on sd.device_id = device_id_old.device_id |where device_id_old.device_id is not null """.stripMargin ) search_oldUser.show() //5.登录人数 val log_device_temp = sc.sql( s""" |select distinct(oe.device_id) as device_id |from data_feed_exposure oe left join agency_id |on oe.device_id = agency_id.device_id |and oe.stat_date ='${stat_date}' |and agency_id.device_id is null """.stripMargin ) log_device_temp.createOrReplaceTempView("log_device_temp") log_device_temp.show() val log_device = sc.sql( s""" |select dt.device_id |from log_device_temp dt left join blacklist |on dt.device_id = blacklist.device_id |where blacklist.device_id is null """.stripMargin ) log_device.createOrReplaceTempView("log_device") log_device.show() //新用户登录人数 val log_newUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(distinct(ld.device_id)) as log_newUser |from log_device ld left join device_id_new |on ld.device_id = device_id_new.device_id |where device_id_new.device_id is not null """.stripMargin ) log_newUser.show() //老用户登录人数 val log_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(distinct(ld.device_id)) as log_oldUser |from log_device ld left join device_id_old |on ld.device_id = device_id_old.device_id |where device_id_old.device_id is not null """.stripMargin ) log_oldUser.show() val result = diary_meigou_newUser.join(diary_meigou_oldUser,"stat_date") .join(diary_clk_newUser,"stat_date") .join(diary_clk_oldUser,"stat_date") .join(diary_exp_newUser,"stat_date") .join(diary_exp_oldUser,"stat_date") .join(search_newUser,"stat_date") .join(search_oldUser,"stat_date") .join(log_newUser,"stat_date") .join(log_oldUser,"stat_date") // GmeiConfig.writeToJDBCTable(result, "diary_meigou_crv", SaveMode.Append) // GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="diary_meigou_crv",SaveMode.Append) println("开始写入") // GmeiConfig.writeToJDBCTable("jerry.jdbcuri",result, table="diary_meigou_crv",SaveMode.Append) // println("写入完成") GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="diary_meigou_crv",SaveMode.Append) println("写入完成") } } } object diary_clk_card { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date=param.date val partition_date = stat_date.replace("-","") //机构id // sc.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") // sc.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") // sc.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") // sc.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") val blacklist = sc.sql( s""" |select device_id from blacklist """.stripMargin ) blacklist.createOrReplaceTempView("blacklist") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") val blacklist_all=sc.sql( s""" |SELECT device_id |FROM blacklist |UNION ALL |SELECT device_id |FROM agency_id """.stripMargin ) blacklist_all.createOrReplaceTempView("blacklist_all") val device_id_oldUser = sc.sql( s""" |select distinct(om.device_id) as device_id |from online.ml_device_day_active_status om left join blacklist_all |on om.device_id = blacklist_all.device_id |where om.active_type = '4' |and om.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown') |and om.partition_date ='${partition_date}' |and blacklist_all.device_id is null """.stripMargin ) device_id_oldUser.createOrReplaceTempView("device_id_old") device_id_oldUser.show() val clk_count_oldUser_Contrast_a = sc.sql( s""" |select '${stat_date}' as stat_date, count(ot.cl_id) as clk_count_oldUser_Contrast_a |from online.tl_hdfs_maidian_view ot inner join device_id_old |on ot.cl_id = device_id_old.device_id |where ot.action='on_click_diary_card' |and ot.params['tab_name'] = '精选' |and ot.params['page_name'] = 'home' |and ot.cl_id regexp'1$$' |and ot.partition_date ='${partition_date}' """.stripMargin ) clk_count_oldUser_Contrast_a.show() val clk_count_oldUser_Contrast_b = sc.sql( s""" |select '${stat_date}' as stat_date, count(ot.cl_id) as clk_count_oldUser_Contrast_b |from online.tl_hdfs_maidian_view ot inner join device_id_old |on ot.cl_id = device_id_old.device_id |where ot.action='full_stack_click_video_card_full_screen_play' |and ot.params['tab_name'] = '精选' |and ot.params["card_type"]="diary" |and ot.cl_id regexp'1$$' |and ot.partition_date ='${partition_date}' """.stripMargin ) val imp_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser_Contrast_precise = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast_precise |from data_feed_exposure_precise je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val clk_count_oldUser_all_a = sc.sql( s""" |select '${stat_date}' as stat_date, count(ot.cl_id) as clk_count_oldUser_all_a |from online.tl_hdfs_maidian_view ot inner join device_id_old |on ot.cl_id = device_id_old.device_id |where ot.action='on_click_diary_card' |and ot.params['tab_name'] = '精选' |and ot.params['page_name'] = 'home' |and ot.partition_date ='${partition_date}' """.stripMargin ) val clk_count_oldUser_all_b = sc.sql( s""" |select '${stat_date}' as stat_date, count(ot.cl_id) as clk_count_oldUser_all_b |from online.tl_hdfs_maidian_view ot inner join device_id_old |on ot.cl_id = device_id_old.device_id |where ot.action='full_stack_click_video_card_full_screen_play' |and ot.params['tab_name'] = '精选' |and ot.params["card_type"]="diary" |and ot.partition_date ='${partition_date}' """.stripMargin ) val imp_count_oldUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_all |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser_all_precise = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_all_precise |from data_feed_exposure_precise je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) //统计新用户ctr val device_id_newUser = sc.sql( s""" |select distinct(device_id) as device_id |from online.ml_device_day_active_status |where (active_type = '1' or active_type = '2') |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown') |and partition_date ='${partition_date}' """.stripMargin ) device_id_newUser.createOrReplaceTempView("device_id_new") val clk_count_newUser_Contrast_a = sc.sql( s""" |select '${stat_date}' as stat_date, count(ot.cl_id) as clk_count_newUser_Contrast_a |from online.tl_hdfs_maidian_view ot inner join device_id_new |on ot.cl_id = device_id_new.device_id |where ot.action='on_click_diary_card' |and ot.params['tab_name'] = '精选' |and ot.params['page_name'] = 'home' |and ot.cl_id regexp'1$$' |and ot.partition_date ='${partition_date}' """.stripMargin ) val clk_count_newUser_Contrast_b = sc.sql( s""" |select '${stat_date}' as stat_date, count(ot.cl_id) as clk_count_newUser_Contrast_b |from online.tl_hdfs_maidian_view ot inner join device_id_new |on ot.cl_id = device_id_new.device_id |where ot.action='full_stack_click_video_card_full_screen_play' |and ot.params['tab_name'] = '精选' |and ot.params["card_type"]="diary" |and ot.cl_id regexp'1$$' |and ot.partition_date ='${partition_date}' """.stripMargin ) val imp_count_newUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_newUser_Contrast |from data_feed_exposure je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val imp_count_newUser_Contrast_precise = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_newUser_Contrast_precise |from data_feed_exposure_precise je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val clk_count_newUser_all_a = sc.sql( s""" |select '${stat_date}' as stat_date, count(ot.cl_id) as clk_count_newUser_all_a |from online.tl_hdfs_maidian_view ot inner join device_id_new |on ot.cl_id = device_id_new.device_id |where ot.action='on_click_diary_card' |and ot.params['tab_name'] = '精选' |and ot.params['page_name'] = 'home' |and ot.partition_date ='${partition_date}' """.stripMargin ) val clk_count_newUser_all_b = sc.sql( s""" |select '${stat_date}' as stat_date, count(ot.cl_id) as clk_count_newUser_all_b |from online.tl_hdfs_maidian_view ot inner join device_id_new |on ot.cl_id = device_id_new.device_id |where ot.action='full_stack_click_video_card_full_screen_play' |and ot.params['tab_name'] = '精选' |and ot.params["card_type"]="diary" |and ot.partition_date ='${partition_date}' """.stripMargin ) val imp_count_newUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_newUser_all |from data_feed_exposure je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val imp_count_newUser_all_precise = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_newUser_all_precise |from data_feed_exposure_precise je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val result1 = clk_count_oldUser_Contrast_a.join(clk_count_oldUser_Contrast_b,"stat_date") .join(imp_count_oldUser_Contrast,"stat_date") .join(clk_count_oldUser_all_a,"stat_date") .join(clk_count_oldUser_all_b,"stat_date") .join(imp_count_oldUser_all,"stat_date") .join(clk_count_newUser_Contrast_a,"stat_date") .join(clk_count_newUser_Contrast_b,"stat_date") .join(imp_count_newUser_Contrast,"stat_date") .join(clk_count_newUser_all_a,"stat_date") .join(clk_count_newUser_all_b,"stat_date") .join(imp_count_newUser_all,"stat_date") .join(imp_count_oldUser_Contrast_precise,"stat_date") .join(imp_count_oldUser_all_precise,"stat_date") .join(imp_count_newUser_Contrast_precise,"stat_date") .join(imp_count_newUser_all_precise,"stat_date") result1.show() // GmeiConfig.writeToJDBCTable(result1, "on_click_diary_card", SaveMode.Append) // GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result1, table="on_click_diary_card",SaveMode.Append) println("开始写入") // GmeiConfig.writeToJDBCTable("jerry.jdbcuri",result1, table="on_click_diary_card",SaveMode.Append) // println("写入完成") GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result1, table="on_click_diary_card",SaveMode.Append) println("写入完成") } } }