Commit 83d8c803 authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

删除预测时生成的日志
parents ead4da69 2bf4c078
......@@ -62,115 +62,55 @@ object app_list {
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1)
println(param.date)
val partition_date = param.date.replace("-","")
println(stat_date)
val partition_date = stat_date.replace("-","")
println(partition_date)
//自定义udf函数,增加dataframe 列
val code = (arg: String) => {
if (arg.getClass.getName == "java.lang.String") partition_date.toInt else 0.toInt
}
val addCol = udf(code)
//以上为udf函数
//机构ID
val agency_id = sc.sql(
//每日新用户
val device_id_newUser = sc.sql(
s"""
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date >= '20180402'
|AND partition_date <= '20181120'
|AND pv_ratio >= 0.95
|UNION ALL
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date >= '20171101'
|AND partition_date <= '20181120'
|AND pv_ratio >= 0.95
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where (active_type = '1' or active_type='2')
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei')
|and partition_date ='${partition_date}'
""".stripMargin
)
agency_id.show()
agency_id.createOrReplaceTempView("agency_id")
device_id_newUser.createOrReplaceTempView("device_id_new")
//获取与新氧用户重合的用户device_id
val app_list = sc.sql(
s"""
|select distinct(cl_id) as device_id, user_id as user_id, params['installed_app_info'] as app_list,channel
|from online.tl_hdfs_maidian_view ov left join agency_id
|on ov.cl_id = agency_id.device_id
|where ov.action="user_installed_all_app_info"
|and ov.partition_date = '${partition_date}'
|and agency_id.device_id is null
|and ov.cl_id not in (select distinct(device_id) from blacklist)
|select '${stat_date}' as stat_date,a.params as app_list
|from online.tl_hdfs_maidian_view a
|inner join device_id_new b
|on a.cl_id=b.device_id
|where a.partition_date ='${partition_date}'
|and a.action='user_installed_all_app_info'
""".stripMargin
)
//app_list.show()
import sc.implicits._
val rdd_df = app_list.rdd.map(x =>(x(0).toString,x(1).toString,x(2).toString,x(3).toString))
.filter(x => x._3.contains("新氧美容")).map(x => (x._1,x._2,x._3,x._4)).collect().toList.toDF("device_id","user_id","app_list","channel")
rdd_df.show()
//rdd_df.withColumn("stat_date",addCol(rdd_df("device_id")))
rdd_df.createOrReplaceTempView("device_id")
app_list.show()
val temp = sc.sql(
s"""
|select *
|from device_id
""".stripMargin
)
val tempp=temp.withColumn("stat_date",addCol(temp("device_id")))
tempp.show()
GmeiConfig.writeToJDBCTable(tempp, "device_id_coincidence", SaveMode.Append)
val result1 = app_list
result1.show()
//所有获得应用列表的用户device_id
val app_list_all = sc.sql(
s"""
|select distinct(cl_id) as device_id, user_id as user_id,params['installed_app_info'] as app_list,channel
|from online.tl_hdfs_maidian_view ov left join agency_id
|on ov.cl_id = agency_id.device_id
|where action="user_installed_all_app_info"
|and agency_id.device_id is null
|and ov.partition_date = '${partition_date}'
|and ov.cl_id not in (select distinct(device_id) from blacklist)
""".stripMargin
)
val tempp_list=app_list_all.withColumn("stat_date",addCol(app_list_all("device_id")))
println("开始写入")
GmeiConfig.writeToJDBCTable(tempp_list, "device_id_applist", SaveMode.Append)
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result1, table="app_list_yunying",SaveMode.Overwrite)
println("写入完成")
//在更美有消费的用户列表
val device_id_meigou = sc.sql(
s"""
|select DISTINCT(od.device_id) as device_id
|from online.ml_meigou_order_detail od left join agency_id
|on od.device_id = agency_id.device_id
|where od.partition_date = '20181118'
|and od.pay_time is not null
|and od.pay_time >= '2017-11-18'
|and agency_id.device_id is null
|and od.device_id not in (select distinct(device_id) from blacklist)
""".stripMargin
)
device_id_meigou.createOrReplaceTempView("device_id_meigou")
val app_list_meigou = sc.sql(
s"""
|select distinct(ov.cl_id) as device_id, user_id as user_id,params['installed_app_info'] as app_list,channel
|from online.tl_hdfs_maidian_view ov left join device_id_meigou
|on ov.cl_id = device_id_meigou.device_id
|where ov.action="user_installed_all_app_info"
|and device_id_meigou.device_id is not null
|and ov.partition_date = '${partition_date}'
""".stripMargin
)
val applist_meigou=app_list_meigou.withColumn("stat_date",addCol(app_list_meigou("device_id")))
GmeiConfig.writeToJDBCTable(applist_meigou, "device_id_applist_meigou", SaveMode.Append)
......
......@@ -804,19 +804,18 @@ object tag_value {
level_index_temp.createOrReplaceTempView("level_index_temp")
// temp1.createOrReplaceTempView("temp1")
// import implicit_
val result = level_index_temp.select("tag_id").distinct().rdd.map{x => x.toString().substring(1,x.toString().length - 1)}.zipWithIndex().toDF("level_id","index_id")
val test=result.select(result.col("level_id").cast(DoubleType).as("level_id"),result.col("index_id").cast(DoubleType).as("index_id"))
test.createOrReplaceTempView("tag_level_index")
// val resDf = spark.createDataFrame(rowRdd)
println("开始写入")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",test, table="tag_level_index",SaveMode.Overwrite)
println("写入完成")
// val result = level_index_temp.select("tag_id").distinct().rdd.map{x => x.toString().substring(1,x.toString().length - 1)}.zipWithIndex().toDF("level_id","index_id")
// val test=result.select(result.col("level_id").cast(DoubleType).as("level_id"),result.col("index_id").cast(DoubleType).as("index_id"))
// test.createOrReplaceTempView("tag_level_index")
//// val resDf = spark.createDataFrame(rowRdd)
//
//
// println("开始写入")
// GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",test, table="tag_level_index",SaveMode.Overwrite)
//
// println("写入完成")
// zhengxing库里面的数据同步到jerry_prod
......@@ -825,7 +824,7 @@ object tag_value {
s"""
|select a.device_id,c.index_id as action,b.value from jerry_prod.icon_train_data a
|left join tagId_value b on a.tag_id=b.level_id
|left join tag_level_index c on a.tag_id=c.level_id
|left join jerry_prod.tag_level_index c on a.tag_id=c.level_id
|where c.index_id is not null and b.value is not null
|and a.stat_date='${stat_date}'
""".stripMargin
......@@ -847,6 +846,110 @@ object tag_value {
object app_list_yunying {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
sc.sql("use jerry_prod")
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date = param.date
//println(param.date)
val partition_date = stat_date.replace("-","")
val device_id_newUser = sc.sql(
s"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where (active_type = '1' or active_type='2')
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei')
|and partition_date ='${partition_date}'
""".stripMargin
)
device_id_newUser.createOrReplaceTempView("device_id_new")
val app_list = sc.sql(
s"""
|select '${stat_date}' as stat_date,channel,a.params['installed_app_info'] as app_list
|from online.tl_hdfs_maidian_view a
|inner join device_id_new b
|on a.cl_id=b.device_id
|where a.partition_date ='${partition_date}'
|and a.action='user_installed_all_app_info'
|and a.cl_type='android'
""".stripMargin
)
app_list.show()
val result1 = app_list
result1.show()
println("开始写入")
println("开始写入")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result1, table="app_list_yunying",SaveMode.Overwrite)
println("写入完成")
}
}
}
object test_data {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment