package com.gmei import java.io.Serializable import com.gmei.WeafareStat.{defaultParams, parser} import org.apache.spark.sql.{SaveMode, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams import java.io._ object temp_analysis { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '20181203' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '20181203' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") // //每日新用户 // val device_id_newUser = sc.sql( // s""" // |select distinct(device_id) as device_id // |from online.ml_device_day_active_status // |where active_type != '4' // |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' // | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' // | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' // | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' // | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' // | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' // | ,'promotion_shike','promotion_julang_jl03') // |and partition_date ='${partition_date}' // """.stripMargin // ) // device_id_newUser.createOrReplaceTempView("device_id_new") val blacklist_id = sc.sql( s""" |SELECT device_id |from blacklist """.stripMargin ) blacklist_id.createOrReplaceTempView("blacklist_id") val final_id = sc.sql( s""" |select device_id |from agency_id |UNION ALL |select device_id |from blacklist_id """.stripMargin ) final_id.createOrReplaceTempView("final_id") val diary_clk_all = sc.sql( s""" |select ov.partition_date,count(ov.cl_id) as clk_num,count(distinct(ov.cl_id)),count(ov.cl_id)/count(distinct(ov.cl_id)) |from online.tl_hdfs_maidian_view ov left join final_id |on ov.cl_id = final_id.device_id |where ov.action = "page_view" |and params['page_name']="diary_detail" |and ov.cl_id != "NULL" |and ov.partition_date >='20181201' |and final_id.device_id is null |group by ov.partition_date |order by ov.partition_date """.stripMargin ) diary_clk_all.show(80) //日记本点击 val referrer=List("about_me_message_list","all_case_service_comment","all_cases","diary_detail","diary_list" ,"diary_listof_related_service","answer_detail","community_home","conversation_detail","create_diary_title","diary_listof_related_service", "doctor_all_cases","hospital_all_cases","my_favor","my_order","order_detail","personal_store_diary_list","received_votes", "topic_detail","welfare_detail","welfare_list","welfare_special","wiki_detail","zone_detail", "expert_detail","free_activity_detail","home","message_home","my_diary","organization_detail","other_homepage","question_detail", "search_result_diary","search_result_more","welfare_detail","zone_v3") for( a <- referrer ){ val diary_clk_temp = sc.sql( s""" |select ov.partition_date,count(ov.cl_id) as clk_num,count(distinct(ov.cl_id)),count(ov.cl_id)/count(distinct(ov.cl_id)) |from online.tl_hdfs_maidian_view ov left join final_id |on ov.cl_id = final_id.device_id |where ov.action = "page_view" |and params['page_name']="diary_detail" |and params['referrer']='${a}' |and ov.cl_id != "NULL" |and ov.partition_date >='20181201' |and final_id.device_id is null |group by ov.partition_date |order by ov.partition_date """.stripMargin ) println("来源:",a) diary_clk_temp.show(80) } //5.登录人数 val log_device_temp = sc.sql( s""" |select oe.stat_date,count(distinct(oe.device_id)) as log_num |from data_feed_exposure oe left join final_id |on oe.device_id = final_id.device_id |and oe.stat_date >='2018-11-01' |and final_id.device_id is null |group by oe.stat_date |order by oe.stat_date """.stripMargin ) println("登录人数统计:") log_device_temp.show(80) } } } object ARPU_COM { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") val blacklist_id = sc.sql( s""" |SELECT device_id |from blacklist """.stripMargin ) blacklist_id.createOrReplaceTempView("blacklist_id") val final_id = sc.sql( s""" |select device_id |from agency_id |UNION ALL |select device_id |from blacklist_id """.stripMargin ) final_id.createOrReplaceTempView("final_id") val diary_clk_all = sc.sql( s""" |select sum(md.gengmei_price) as pay_all,count(distinct(md.device_id)) as consum_num |from online.ml_meigou_order_detail md left join final_id |on md.device_id = final_id.device_id |where md.status= 2 |and final_id.device_id is null |and md.partition_date = '20181218' |and md.pay_time is not null |and md.pay_time >= '2018-11-01' |and md.pay_time <= '2018-11-30' """.stripMargin ) diary_clk_all.show(80) val active_num = sc.sql( s""" |select count(distinct(device_id)) as active_num |from online.ml_device_month_active_status |where partition_date = '20181130' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3','wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang','js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4','TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03') """.stripMargin ) active_num.show(80) } } } object hospital_gengmei { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") val hospital_gengmei = sc.sql( s""" |SELECT id,name,location,city_id |FROM online.tl_hdfs_hospital_view |WHERE partition_date = '20181219' """.stripMargin ) hospital_gengmei.show() GmeiConfig.writeToJDBCTable(hospital_gengmei, "hospital_gengmei", SaveMode.Append) } } } object meigou_xiaofei_renshu { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") val blacklist_id = sc.sql( s""" |SELECT device_id |from blacklist """.stripMargin ) blacklist_id.createOrReplaceTempView("blacklist_id") val final_id = sc.sql( s""" |select device_id |from agency_id |UNION ALL |select device_id |from blacklist_id """.stripMargin ) final_id.createOrReplaceTempView("final_id") // val meigou_price = sc.sql( // s""" // |select md.user_id,sum(md.gengmei_price) as pay_all // |from online.ml_meigou_order_detail md left join final_id // |on md.device_id = final_id.device_id // |where md.status= 2 // |and final_id.device_id is null // |and md.partition_date = '20181223' // |and md.pay_time is not null // |and md.validate_time>'2017-01-01 00:00:00.0' // |group by md.user_id // |order by sum(md.gengmei_price) // """.stripMargin // ) // meigou_price.show(80) val meigou_price = sc.sql( s""" |select md.user_id,sum(md.gengmei_price) as pay_all |from online.ml_meigou_order_detail md |left join |( | SELECT | order_id | FROM mining.ml_order_spam_recognize | WHERE partition_date='20181223' AND | self_support=0 AND dayBitsGetW1(predict_result,'20181223')=0 |)spam |on md.order_id = spam.order_id |where md.status= 2 |and spam.order_id is null |and md.partition_date = '20181223' |and md.pay_time is not null |and md.validate_time>'2017-01-01 00:00:00.0' |group by md.user_id |order by sum(md.gengmei_price) """.stripMargin ) // meigou_price.show(80) // GmeiConfig.writeToJDBCTable(meigou_price, "meigou_price", SaveMode.Overwrite) } } } object smart_rank_count { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") val blacklist_id = sc.sql( s""" |SELECT device_id |from blacklist """.stripMargin ) blacklist_id.createOrReplaceTempView("blacklist_id") val final_id = sc.sql( s""" |select device_id |from agency_id |UNION ALL |select device_id |from blacklist_id """.stripMargin ) final_id.createOrReplaceTempView("final_id") val user_city_meigou_view = sc.sql( s""" |select ov.cl_id as device_id,ov.city_id as device_city,ov.params['business_id'] as meigou_id |from online.tl_hdfs_maidian_view ov left join final_id |on ov.cl_id = final_id.device_id |where ov.action = "page_view" |and ov.params['page_name']="welfare_detail" |and ov.partition_date >='20181101' |and ov.partition_date <'20181201' |and ov.city_id is not null |and final_id.device_id is null """.stripMargin ) user_city_meigou_view.createOrReplaceTempView("user_city_meigou_view") val meigou_city = sc.sql( s""" |select b.id as meigou_id,d.city_id as meigou_city |from online.tl_meigou_service_view b |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id |where b.partition_date='20181228' |and c.partition_date='20181228' |and d.partition_date='20181228' """.stripMargin ) meigou_city.createOrReplaceTempView("meigou_city") val meigou_pv_tongcheng = sc.sql( s""" |select a.device_id,a.device_city,a.meigou_id,b.meigou_city |from user_city_meigou_view a |left join meigou_city b |on a.meigou_id = b.meigou_id """.stripMargin ) meigou_pv_tongcheng.createOrReplaceTempView("meigou_pv_tongcheng") val meigou_pv_count = sc.sql( s""" |select '2018-11' as stat_date,meigou_city,count(device_id) as meigou_pv,count(distinct(device_id)) as meigou_device_num |from meigou_pv_tongcheng |where device_city = meigou_city |group by meigou_city """.stripMargin ) meigou_pv_count.createOrReplaceTempView("meigou_pv_count") //开始计算咨询 val zixun_meigou_view = sc.sql( s""" |select ov.cl_id as device_id,ov.city_id as device_city,ov.params['service_id'] as meigou_id |from online.tl_hdfs_maidian_view ov left join final_id |on ov.cl_id = final_id.device_id |where ov.partition_date >= '20181101' |and ov.partition_date < '20181201' |and ov.action = 'welfare_detail_click_message' |and final_id.device_id is null """.stripMargin ) zixun_meigou_view.createOrReplaceTempView("zixun_meigou_view") val zixun_meigou_tongcheng = sc.sql( s""" |select a.device_id,a.device_city,a.meigou_id,b.meigou_city |from zixun_meigou_view a |left join meigou_city b |on a.meigou_id=b.meigou_id """.stripMargin ) zixun_meigou_tongcheng.createOrReplaceTempView("zixun_meigou_tongcheng") val zixun_pv_count = sc.sql( s""" |select '2018-11' as stat_date,meigou_city,count(device_id) as meigou_zixun,count(distinct(device_id)) as meigou_zixun_device_num |from zixun_meigou_tongcheng |where device_city=meigou_city |group by meigou_city """.stripMargin ) zixun_pv_count.createOrReplaceTempView("zixun_pv_count") //开始计算每个地区每月新增设备 val device_new_count = sc.sql( s""" |select first_city,count(distinct(device_id)) as new_device_month |from online.ml_device_day_active_status |where active_type != '4' |and partition_date >='20181101' |and partition_date <'20181201' |group by first_city """.stripMargin ) device_new_count.createOrReplaceTempView("device_new_count") //将所有的数据综合一起 val all_count = sc.sql( s""" |select mc.stat_date,mc.meigou_city,mc.meigou_pv,mc.meigou_device_num,zc.meigou_zixun,zc.meigou_zixun_device_num,dc.new_device_month |from meigou_pv_count mc |left join zixun_pv_count zc on mc.meigou_city = zc.meigou_city |left join device_new_count dc on dc.first_city=mc.meigou_city """.stripMargin ) all_count.show() GmeiConfig.writeToJDBCTable(all_count, "smart_rank_count", SaveMode.Append) } } } //话题相关问题统计 object question_count { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") val question_id=List(212264,212266,212272,212281,212287,212436,212439,212437,212505,212506,212507,212522,212523,212526,212532,212783,212787,212789,212793,212796,213202,213199,213216,213219,213297,213224,213226,213239,213300,213302,213316,213307,213308,213370,213377,213349,213358,213368,213392,213393,213435,213453,213445,213448,213458,213466,213471,213478,213485,213638,213642,213644,213727,213729,213775,213776,213810,213817,213805,213821,213884,213885,213892,213834,213879,214043,214050,214062,214055,214056,214058,214064,214159,214182,214149,214184,214190,214206,214227,214243,214242,214288,214289,214293,214295,214541,214544,214546,214614,214618,214619,214620,214682,214683,214684,214848,214850,214854,214856,214857,214903,214908,214913,214918,214919,214980,214981,214988,214985,215031,215034,215036,215039,215094,215098,215104,215107,215112,215222,215225,215233,215237,215265,215366,215347,215346,215343) for( a <- question_id ){ val agency_id = sc.sql( s""" |SELECT partition_date,count(cl_id) |FROM online.tl_hdfs_maidian_view |WHERE partition_date >= '20190101' |and action='community_home_click_feed_card' |and params["card_type"]="问题" |and params['business_id']='212264' |group by partition_date |order by partition_date """.stripMargin ) println("question_id:",a) agency_id.show(20) } } } }