Commit dbdb37ff authored by 王志伟's avatar 王志伟

新统计需求

parent fedef757
......@@ -236,62 +236,37 @@ object diary_exposure {
|and de.stat_date ='${stat_date}'
""".stripMargin
)
val diary_id = diary_id_temp.rdd.map(x =>(x(0).toString).map(x => (x._)).collect().toList.toDF("device_id")
diary_id_temp.createOrReplaceTempView("diary_id_temp")
val diary_id = diary_id_temp.rdd.map(x =>x(0).toString).collect()
val temp1 = sc.sql(
val cid_tag =
s"""
|select diary_id,tag_id
|from api_diary_tags
|select diary_id,tag_id from api_diary_tags
|where diary_id in (${diary_id.map(x => s"'$x'").mkString(",")})
""".stripMargin
)
temp1.createOrReplaceTempView("temp1")
val temp2 = sc.sql(
val cid_city = mysql_df(sc,mimas_url,"api_diary_tags",mimas_user,mimas_password,cid_tag)
val tag_list = cid_city.select("tag_id").collect().map(x => x(0).toString).distinct
val tag_city =
s"""
|select distinct(temp1.diary_id) as diary_id,a.name as city
|from api_tag a left join temp1
|on a.id = temp1.tag_id
|where a.tag_type = 4
""".stripMargin
)
temp2.createOrReplaceTempView("temp2")
|select id,name from api_tag where tag_type = 4
|and id in (${tag_list.map(x => s"'$x'").mkString(",")})
""".stripMargin
val city_df = mysql_df(sc,zhengxing_url,"api_tag",zhengxing_user,zhengxing_password,tag_city)
.na.drop().withColumnRenamed("id","tag_id")
val df_cid_city = cid_city.join(city_df,Seq("tag_id"),"left_outer").na.drop()
.drop("tag_id")
val final_cid_city = diary_id_temp.join(df_cid_city,Seq("diary_id"),"left_outer").na.drop()
final_cid_city.show()
val exposure_city = sc.sql(
s"""
|select temp2.diary_id as diary_id,count(a.cid_id) as imp_num,temp2.city as city
|from data_feed_exposure a left join temp2
|on a.cid_id = temp2.diary_id
|where a.stat_date='${stat_date}'
|and temp2.diary_id is not null
""".stripMargin
)
exposure_city.createOrReplaceTempView("exposure_city")
val city_exposure = sc.sql(
s"""
|select count(imp_num) as num,city
|from exposure_city
|where group by city order by num
""".stripMargin
)
city_exposure.show(20)
city_exposure.createOrReplaceTempView("city_exposure")
val final_count = sc.sql(
s"""
|select city,num/count(num)
|from city_exposure
""".stripMargin
)
final_count.show()
val temp = final_count.collect()
for (i <- 0 until temp.length ) {
println(temp(i))
}
//GmeiConfig.writeToJDBCTable(diary_id, "dairy_id_queue", SaveMode.Append)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment