package com.gmei import java.io.Serializable import com.gmei.WeafareStat.{defaultParams, parser} import org.apache.spark.sql.{DataFrame, SaveMode, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams //import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object strategy_other { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } //统计每天新用户的点击率 def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ //val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = param.date.replace("-","") val devicee_id_newUser = sc.sql( s""" |select distinct(device_id) as device_id |from online.ml_device_day_active_status |where active_type != '4' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03') |and partition_date ='${partition_date}' """.stripMargin ) devicee_id_newUser.show() devicee_id_newUser.createOrReplaceTempView("device_id_new") val clk_count_newUser_Contrast = sc.sql( s""" |select '${param.date}' as stat_date, count(cid_id) as clk_count_newUser_Contrast |from data_feed_click jd inner join device_id_new |on jd.device_id = device_id_new.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${param.date}' """.stripMargin ) val imp_count_newUser_Contrast = sc.sql( s""" |select '${param.date}' as stat_date, count(cid_id) as imp_count_newUser_Contrast |from data_feed_exposure je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${param.date}' """.stripMargin ) val clk_count_newUser_all = sc.sql( s""" |select '${param.date}' as stat_date, count(cid_id) as clk_count_newUser_all |from data_feed_click jd inner join device_id_new |on jd.device_id = device_id_new.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${param.date}' """.stripMargin ) val imp_count_newUser_all = sc.sql( s""" |select '${param.date}' as stat_date, count(cid_id) as imp_count_newUser_all |from data_feed_exposure je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${param.date}' """.stripMargin ) val result3 = clk_count_newUser_Contrast.join(imp_count_newUser_Contrast,"stat_date") .join(clk_count_newUser_all,"stat_date") .join(imp_count_newUser_all,"stat_date") result3.show() GmeiConfig.writeToJDBCTable(result3, "Recommendation_strategy_newUser", SaveMode.Append) } } } //下边内容开始分析统计推荐系统评价指标 object diary_exposure { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "eagle", tableName = "src_mimas_prod_api_diary") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") val mimas_url ="jdbc:mysql://rr-m5et21lafq1677pid.mysql.rds.aliyuncs.com/mimas_prod" val mimas_user = "mimas" val mimas_password = "GJL3UJe1Ck9ggL6aKnZCq4cRvM" val zhengxing_url = "jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com/zhengxing" val zhengxing_user = "work" val zhengxing_password = "BJQaT9VzDcuPBqkd" def mysql_df(spark:SparkSession,url:String,table:String,user:String,password:String,sql:String): DataFrame ={ val jdbcDF = spark.read .format("jdbc") .option("driver", "com.mysql.jdbc.Driver") .option("url", url) .option("dbtable", table) .option("user", user) .option("password", password) .load() jdbcDF.createOrReplaceTempView(table) try {spark.sql(sql)} catch {case _ => spark.emptyDataFrame} } val stat_date = GmeiConfig.getMinusNDate(1) val partition_date = stat_date.replace("-","") //机构ID val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.show() agency_id.createOrReplaceTempView("agency_id") //当日首页精选总曝光,去除结构和黑名单 val imp_count = sc.sql( s""" |select count(cid_id) as imp_num |from data_feed_exposure de left join agency_id |on de.device_id = agency_id.device_id |where de.cid_type = 'diary' |and agency_id.device_id is null |and de.stat_date ='2018-11-26' |and de.device_id not in (select distinct(device_id) from blacklist) """.stripMargin ) imp_count.show() //曝光表中的日记id,去除机构和黑名单 val diary_id_temp = sc.sql( s""" |select cid_id as diary_id |from data_feed_exposure de left join agency_id |on de.device_id = agency_id.device_id |where de.cid_type = 'diary' |and agency_id.device_id is null |and de.stat_date ='2018-11-26' |and de.device_id not in (select distinct(device_id) from blacklist) """.stripMargin ) diary_id_temp.createOrReplaceTempView("diary_id_temp") val diary_id = diary_id_temp.rdd.map(x =>x(0).toString).collect() val cid_tag = s""" |select diary_id,tag_id from api_diary_tags |where diary_id in (${diary_id.map(x => s"'$x'").mkString(",")}) """.stripMargin val cid_city = mysql_df(sc,mimas_url,"api_diary_tags",mimas_user,mimas_password,cid_tag) val tag_list = cid_city.select("tag_id").collect().map(x => x(0).toString).distinct val tag_city = s""" |select id,name from api_tag where tag_type = 4 |and id in (${tag_list.map(x => s"'$x'").mkString(",")}) """.stripMargin val city_df = mysql_df(sc,zhengxing_url,"api_tag",zhengxing_user,zhengxing_password,tag_city) .na.drop().withColumnRenamed("id","tag_id") val df_cid_city = cid_city.join(city_df,Seq("tag_id"),"left_outer").na.drop() .drop("tag_id") val final_cid_city = diary_id_temp.join(df_cid_city,Seq("diary_id"),"left_outer") final_cid_city.show() val df1=final_cid_city.groupBy("name").count().orderBy(desc("count")) //3.5星以上日记本的id val diary_id_temp2 = sc.sql( s""" |select id as diary_id |from src_mimas_prod_api_diary |where content_level >=3.5 |and doctor_id is not null """.stripMargin ) diary_id_temp2.createOrReplaceTempView("diary_id_temp2") val diary_id2 = diary_id_temp2.rdd.map(x =>x(0).toString).collect() val cid_tag2 = s""" |select diary_id,tag_id from api_diary_tags |where diary_id in (${diary_id2.map(x => s"'$x'").mkString(",")}) """.stripMargin val cid_city2 = mysql_df(sc,mimas_url,"api_diary_tags",mimas_user,mimas_password,cid_tag2) val tag_list2 = cid_city2.select("tag_id").collect().map(x => x(0).toString).distinct val tag_city2 = s""" |select id,name from api_tag where tag_type = 4 |and id in (${tag_list2.map(x => s"'$x'").mkString(",")}) """.stripMargin val city_df2 = mysql_df(sc,zhengxing_url,"api_tag",zhengxing_user,zhengxing_password,tag_city2) .na.drop().withColumnRenamed("id","tag_id") val df_cid_city2 = cid_city2.join(city_df2,Seq("tag_id"),"left_outer").na.drop() .drop("tag_id") val final_cid_city2 = diary_id_temp2.join(df_cid_city2,Seq("diary_id"),"left_outer") final_cid_city2.show() val df2 =final_cid_city2.groupBy("name").count().orderBy(desc("count")) val df3 =df1.join(df2,Seq("name"),"left_outer") df3.show(400) //GmeiConfig.writeToJDBCTable(diary_id, "dairy_id_queue", SaveMode.Append) } } }