Commit fedef757 authored by 王志伟's avatar 王志伟

提交

parent e7c2d6a6
......@@ -380,7 +380,6 @@ object Recommendation_strategy_all {
}
......
......@@ -3,10 +3,12 @@ package com.gmei
import java.io.Serializable
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.spark.sql.{DataFrame, SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
//import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
import org.apache.spark.sql.SparkSession
object strategy_other {
......@@ -175,12 +177,70 @@ object diary_exposure {
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "mimas_prod", tableName = "api_diary_tags")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
// ti.tidbMapTable(dbName = "mimas_prod", tableName = "api_diary_tags")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "zhengxing", tableName = "api_tag")
// ti.tidbMapTable(dbName = "zhengxing", tableName = "api_tag")
val mimas_url ="jdbc:mysql://rr-m5et21lafq1677pid.mysql.rds.aliyuncs.com/mimas_prod"
val mimas_user = "mimas"
val mimas_password = "GJL3UJe1Ck9ggL6aKnZCq4cRvM"
val zhengxing_url = "jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com/zhengxing"
val zhengxing_user = "work"
val zhengxing_password = "BJQaT9VzDcuPBqkd"
def mysql_df(spark:SparkSession,url:String,table:String,user:String,password:String,sql:String): DataFrame ={
val jdbcDF = spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
jdbcDF.createOrReplaceTempView(table)
try {spark.sql(sql)}
catch {case _ => spark.emptyDataFrame}
}
val stat_date = GmeiConfig.getMinusNDate(1)
val partition_date = stat_date.replace("-","")
//机构ID
val agency_id = sc.sql(
s"""
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date >= '20180402'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
|UNION ALL
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date >= '20171101'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
""".stripMargin
)
agency_id.show()
agency_id.createOrReplaceTempView("agency_id")
val diary_id_temp = sc.sql(
s"""
|select cid_id as diary_id
|from data_feed_exposure de left join agency_id
|on de.device_id = agency_id.device_id
|where de.cid_type = 'diary'
|and de.device_id not in (select device_id from blacklist)
|and agency_id.device_id is null
|and de.stat_date ='${stat_date}'
""".stripMargin
)
val diary_id = diary_id_temp.rdd.map(x =>(x(0).toString).map(x => (x._)).collect().toList.toDF("device_id")
val temp1 = sc.sql(
s"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment