package com.gmei import java.io.Serializable import com.gmei import com.gmei.WeafareStat.{defaultParams, parser} import org.apache.spark.sql.SaveMode //import org.apache.spark.sql.{SaveMode, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams import java.io._ import scala.util.parsing.json._ object temp_analysis { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") import sc.implicits._ // val stat_date = GmeiConfig.getMinusNDate(1) val stat_date=param.date //println(param.date) val partition_date = stat_date.replace("-","") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '20181203' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '20181203' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") val blacklist_id = sc.sql( s""" |SELECT device_id |from blacklist """.stripMargin ) blacklist_id.createOrReplaceTempView("blacklist_id") val final_id = sc.sql( s""" |select device_id |from agency_id |UNION ALL |select device_id |from blacklist_id """.stripMargin ) final_id.createOrReplaceTempView("final_id") // //每日新用户 val device_id_newUser = sc.sql( s""" |select distinct(oms.device_id) as device_id |from online.ml_device_day_active_status oms left join final_id |on oms.device_id=final_id.device_id |where (oms.active_type = '2' or oms.active_type='1') |and oms.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') |and oms.partition_date ='${partition_date}' |and final_id.device_id is null """.stripMargin ) device_id_newUser.createOrReplaceTempView("device_id_new") val diary_clk_new = sc.sql( s""" |select ov.partition_date,ov.cl_id as device_id,ov.params['diary_id'] as diary_id |from online.tl_hdfs_maidian_view ov inner join device_id_new |on ov.cl_id = device_id_new.device_id |where ov.action = 'on_click_diary_card' |and ov.params['tab_name'] = '精选' |and ov.params['page_name'] = 'home' |and ov.partition_date='${partition_date}' """.stripMargin ) diary_clk_new.show(80) GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",diary_clk_new, table="temp",SaveMode.Append) println("写入完成") } } } object ARPU_COM { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") val blacklist_id = sc.sql( s""" |SELECT device_id |from blacklist """.stripMargin ) blacklist_id.createOrReplaceTempView("blacklist_id") val final_id = sc.sql( s""" |select device_id |from agency_id |UNION ALL |select device_id |from blacklist_id """.stripMargin ) final_id.createOrReplaceTempView("final_id") val diary_clk_all = sc.sql( s""" |select sum(md.gengmei_price) as pay_all,count(distinct(md.device_id)) as consum_num |from online.ml_meigou_order_detail md left join final_id |on md.device_id = final_id.device_id |where md.status= 2 |and final_id.device_id is null |and md.partition_date = '20181218' |and md.pay_time is not null |and md.pay_time >= '2018-11-01' |and md.pay_time <= '2018-11-30' """.stripMargin ) diary_clk_all.show(80) val active_num = sc.sql( s""" |select count(distinct(device_id)) as active_num |from online.ml_device_month_active_status |where partition_date = '20181130' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3','wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang','js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4','TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03') """.stripMargin ) active_num.show(80) } } } object hospital_gengmei { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") val hospital_gengmei = sc.sql( s""" |SELECT id,name,location,city_id |FROM online.tl_hdfs_hospital_view |WHERE partition_date = '20181219' """.stripMargin ) hospital_gengmei.show() GmeiConfig.writeToJDBCTable(hospital_gengmei, "hospital_gengmei", SaveMode.Append) } } } object meigou_xiaofei_renshu { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ // val stat_date = GmeiConfig.getMinusNDate(1) val stat_date=param.date //println(param.date) val partition_date = stat_date.replace("-","") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") val blacklist_id = sc.sql( s""" |SELECT device_id |from blacklist """.stripMargin ) blacklist_id.createOrReplaceTempView("blacklist_id") val final_id = sc.sql( s""" |select device_id |from agency_id |UNION ALL |select device_id |from blacklist_id """.stripMargin ) final_id.createOrReplaceTempView("final_id") // val meigou_price = sc.sql( // s""" // |select md.user_id,sum(md.gengmei_price) as pay_all // |from online.ml_meigou_order_detail md left join final_id // |on md.device_id = final_id.device_id // |where md.status= 2 // |and final_id.device_id is null // |and md.partition_date = '20181223' // |and md.pay_time is not null // |and md.validate_time>'2017-01-01 00:00:00.0' // |group by md.user_id // |order by sum(md.gengmei_price) // """.stripMargin // ) // meigou_price.show(80) val meigou_price = sc.sql( s""" |select md.user_id,sum(md.gengmei_price) as pay_all |from online.ml_meigou_order_detail md |left join |( | SELECT | order_id | FROM mining.ml_order_spam_recognize | WHERE partition_date='20181223' AND | self_support=0 AND dayBitsGetW1(predict_result,'20181223')=0 |)spam |on md.order_id = spam.order_id |where md.status= 2 |and spam.order_id is null |and md.partition_date = '20181223' |and md.pay_time is not null |and md.validate_time>'2017-01-01 00:00:00.0' |group by md.user_id |order by sum(md.gengmei_price) """.stripMargin ) // meigou_price.show(80) // GmeiConfig.writeToJDBCTable(meigou_price, "meigou_price", SaveMode.Overwrite) } } } object alpha_ctr { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = param.date //println(param.date) val partition_date = stat_date.replace("-","") val click_count_recommend = sc.sql( s""" |select '${stat_date}' as stat_date,count(*) as click_count_recommend |from bl.bl_alpha_et_mg_maidianlog_inc_d |where params['tab_name']='recommend' |and params['page_name']='home' |and type='on_click_feed_topic_card' |and partition_day='${partition_date}' """.stripMargin ) click_count_recommend.show() val click_count_focus = sc.sql( s""" |select '${stat_date}' as stat_date,count(*) as click_count_focus |from bl.bl_alpha_et_mg_maidianlog_inc_d |where params['tab_name']='focus' |and params['page_name']='home' |and type='on_click_feed_topic_card' |and partition_day='${partition_date}' """.stripMargin ) click_count_focus.show() def parse_json(str:String): Int ={ var t = List[Map[String, Any]]() val result = JSON.parseFull(str) result match { case Some(b: List[Map[String, Any]]) => t = t ++ b case None => println("Parsing failed") case other => println("Unknown data structure: " + other) } t.size } val expoure_cards=sc.sql( s""" |select params['exposure_cards'] as exposure_cards |from bl.bl_alpha_et_mg_maidianlog_inc_d |where params['tab_name'] = 'recommend' |and params['page_name'] = 'home' |and type = 'page_precise_exposure' |and partition_day='${partition_date}' """.stripMargin ) val a =expoure_cards.rdd.map(row => row(0).toString).map(row=>parse_json(row)).collect().sum val result1=List((stat_date,a)) val df1 = sc.createDataFrame(result1).toDF("stat_date","expoure_count_recommend") val expoure_cards2=sc.sql( s""" |select params['exposure_cards'] as exposure_cards |from bl.bl_alpha_et_mg_maidianlog_inc_d |where params['tab_name'] = 'focus' |and params['page_name'] = 'home' |and type = 'page_precise_exposure' |and partition_day='${partition_date}' """.stripMargin ) val b =expoure_cards2.rdd.map(row => row(0).toString).map(row=>parse_json(row)).collect().sum val result2=List((stat_date,b)) val df2 = sc.createDataFrame(result2).toDF("stat_date","expoure_count_focus") val result=click_count_recommend.join(click_count_focus,"stat_date") .join(df1,"stat_date") .join(df2,"stat_date") // GmeiConfig.writeToJDBCTable(result, "alpha_ctr", SaveMode.Append) // GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="alpha_ctr",SaveMode.Append) println("开始写入") // GmeiConfig.writeToJDBCTable("jerry.jdbcuri",result, table="alpha_ctr",SaveMode.Append) // GmeiConfig.writeToJDBCTable(result, "alpha_ctr", SaveMode.Append) GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="alpha_ctr",SaveMode.Append) println("写入完成") val device_num_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(DISTINCT(device_id)) as device_num |from ml.ML_ALPHA_C_CT_DV_DEVICE_DIMEN_D |where partition_day='${partition_date}' |and is_today_active='true' """.stripMargin ) device_num_count.show() val duration_device=sc.sql( s""" |select '${stat_date}' as stat_date,sum(user_duration)/count(DISTINCT(device_id)) as device_duration |from ml.ML_ALPHA_C_CT_DV_DEVICE_INDIC_INC_D |WHERE partition_day='${partition_date}' |and open_times!="0" """.stripMargin ) val result3=device_num_count.join(duration_device,"stat_date") // GmeiConfig.writeToJDBCTable(result3, "alpha_duration", SaveMode.Append) GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result3, table="alpha_duration",SaveMode.Append) println("写入完成") } } } //话题相关问题统计 object copy_database { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_test", tableName = "tl_hdfs_wiki_item_tag_view") // ti.tidbMapTable(dbName = "jerry_test", tableName = "Knowledge_network") // ti.tidbMapTable(dbName = "eagle", tableName = "src_mimas_prod_api_diary") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date=param.date val partition_date = stat_date.replace("-","") val new_data = sc.sql( s""" |select d.level2_id,d.level2_name,c.item_id,c.tag_id,c.id,c.name,c.treatment_method,c.price_min,c.price_max,c.treatment_time,c.maintain_time,c.recover_time |from online.bl_tag_hierarchy_detail d |inner join |(select a.item_id,a.tag_id,b.id,b.name,b.treatment_method,b.price_min,b.price_max,b.treatment_time,b.maintain_time,b.recover_time |from online.tl_hdfs_wiki_item_tag_view a |inner join Knowledge_network b |on a.item_id=b.id |where a.partition_date='${partition_date}') c |on d.id=c.tag_id |where d.partition_date='${partition_date}' """.stripMargin ) GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",new_data, "train_Knowledge_network_data", SaveMode.Overwrite) } } }