Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
7832deda
Commit
7832deda
authored
Oct 30, 2018
by
王志伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
返回到29号的版本,git版本回退有问题,所以采用这个方法
parent
3b315a1f
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
83 additions
and
114 deletions
+83
-114
strategy_clk_imp_oldUser.scala
...da/src/main/scala/com/gmei/strategy_clk_imp_oldUser.scala
+83
-114
No files found.
eda/feededa/src/main/scala/com/gmei/strategy_clk_imp_oldUser.scala
View file @
7832deda
package
com.gmei
import
java.io.Serializable
import
java.text.SimpleDateFormat
import
java.util.Calendar
import
com.gmei.WeafareStat.
{
defaultParams
,
parser
}
import
org.apache.spark.sql.
{
SaveMode
,
TiContext
}
...
...
@@ -10,15 +8,13 @@ import org.apache.log4j.{Level, Logger}
import
scopt.OptionParser
import
com.gmei.lib.AbstractParams
import
scala.collection.mutable.ArrayBuffer
object
strategy_clk_imp_oldUser
{
Logger
.
getLogger
(
"org.apache.spark"
).
setLevel
(
Level
.
WARN
)
Logger
.
getLogger
(
"org.apache.eclipse.jetty.server"
).
setLevel
(
Level
.
OFF
)
case
class
Params
(
env
:
String
=
"dev"
//
date: String = "2018-08-01"
case
class
Params
(
env
:
String
=
"dev"
,
date
:
String
=
"2018-08-01"
)
extends
AbstractParams
[
Params
]
with
Serializable
val
defaultParams
=
Params
()
...
...
@@ -28,9 +24,9 @@ object strategy_clk_imp_oldUser {
opt
[
String
](
"env"
)
.
text
(
s
"the databases environment you used"
)
.
action
((
x
,
c
)
=>
c
.
copy
(
env
=
x
))
//
opt[String] ("date")
//
.text(s"the date you used")
//
.action((x,c) => c.copy(date = x))
opt
[
String
]
(
"date"
)
.
text
(
s
"the date you used"
)
.
action
((
x
,
c
)
=>
c
.
copy
(
date
=
x
))
note
(
"""
|For example, the following command runs this app on a tidb dataset:
...
...
@@ -41,28 +37,6 @@ object strategy_clk_imp_oldUser {
)
}
//获得时间array
def
get_date
()
:
ArrayBuffer
[
String
]
={
val
startTime
=
"2018-10-07"
val
endTime
=
"2017-10-10"
val
dateFormat
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
)
val
dateFiled
=
Calendar
.
DAY_OF_MONTH
var
beginDate
=
dateFormat
.
parse
(
startTime
)
val
endDate
=
dateFormat
.
parse
(
endTime
)
val
calendar
=
Calendar
.
getInstance
()
calendar
.
setTime
(
beginDate
)
val
dateArray
:
ArrayBuffer
[
String
]
=
ArrayBuffer
()
while
(
beginDate
.
compareTo
(
endDate
)
<=
0
)
{
dateArray
+=
dateFormat
.
format
(
beginDate
)
calendar
.
add
(
dateFiled
,
1
)
beginDate
=
calendar
.
getTime
}
// println(dateArray)
dateArray
}
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
parser
.
parse
(
args
,
defaultParams
).
map
{
param
=>
GmeiConfig
.
setup
(
param
.
env
)
...
...
@@ -78,106 +52,102 @@ object strategy_clk_imp_oldUser {
import
sc.implicits._
// val stat_date = GmeiConfig.getMinusNDate(1)
//println(param.date)
val
date_array
=
get_date
()
date_array
.
foreach
(
println
)
for
(
elem
<-
date_array
)
{
val
partition_date
=
elem
.
replace
(
"-"
,
""
)
println
(
partition_date
)
val
decive_id_oldUser
=
sc
.
sql
(
// val stat_date = GmeiConfig.getMinusNDate(1)
println
(
param
.
date
)
val
partition_date
=
param
.
date
.
replace
(
"-"
,
""
)
val
decive_id_oldUser
=
sc
.
sql
(
s
"""
|select distinct(device_id) as decive_id
|from online.ml_device_day_active_status
|where active_type = '4'
|and partition_date ='${partition_date}'
"""
.
stripMargin
)
decive_id_oldUser
.
show
()
decive_id_oldUser
.
createOrReplaceTempView
(
"device_id_old"
)
val
decive_id_newUser
=
sc
.
sql
(
s
"""
|select distinct(device_id) as decive_id
|from online.ml_device_day_active_status
|where active_type != '4'
|and partition_date ='${partition_date}'
"""
.
stripMargin
)
decive_id_newUser
.
show
()
decive_id_newUser
.
createOrReplaceTempView
(
"device_id_newUser"
)
val
strategies
=
Seq
(
"[1|2]$"
,
"[3|4]$"
,
"[5|6]$"
,
"[7|8]$"
)
for
(
strategy
<-
strategies
){
val
clk_count_oldUser
=
sc
.
sql
(
s
"""
|select distinct(device_id) as decive_id
|from online.ml_device_day_active_status
|where active_type = '4'
|and partition_date ='${partition_date}'
|select '${param.date}' as stat_date, count(cid_id) as get_clk_count_old
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.decive_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'${strategy}'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${param.date}'
"""
.
stripMargin
)
decive_id_oldUser
.
show
()
decive_id_oldUser
.
createOrReplaceTempView
(
"device_id_old"
)
val
decive_id_new
User
=
sc
.
sql
(
val
imp_count_old
User
=
sc
.
sql
(
s
"""
|select distinct(device_id) as decive_id
|from online.ml_device_day_active_status
|where active_type != '4'
|and partition_date ='${partition_date}'
|select '${param.date}' as stat_date, count(cid_id) as get_imp_count_old
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.decive_id
|where je.cid_type = 'diary'
|and je.device_id regexp'${strategy}'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${param.date}'
"""
.
stripMargin
)
decive_id_newUser
.
show
()
decive_id_newUser
.
createOrReplaceTempView
(
"device_id_newUser"
)
val
strategies
=
Seq
(
"[1|2]$"
,
"[3|4]$"
,
"[5|6]$"
,
"[7|8]$"
)
for
(
strategy
<-
strategies
){
println
(
strategy
)
val
clk_count_oldUser
=
sc
.
sql
(
s
"""
|select '${elem}' as stat_date, count(cid_id) as get_clk_count_old
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.decive_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'${strategy}'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${elem}'
"""
.
stripMargin
)
val
imp_count_oldUser
=
sc
.
sql
(
s
"""
|select '${elem}' as stat_date, count(cid_id) as get_imp_count_old
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.decive_id
|where je.cid_type = 'diary'
|and je.device_id regexp'${strategy}'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${elem}'
"""
.
stripMargin
)
val
clk_count_newUser
=
sc
.
sql
(
s
"""
|select '${elem}' as stat_date, count(cid_id) as get_clk_count_newUser
|from data_feed_click jd inner join device_id_newUser
|on jd.device_id = device_id_newUser.decive_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'${strategy}'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${elem}'
val
clk_count_newUser
=
sc
.
sql
(
s
"""
|select '${param.date}' as stat_date, count(cid_id) as get_clk_count_newUser
|from data_feed_click jd inner join device_id_newUser
|on jd.device_id = device_id_newUser.decive_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'${strategy}'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${param.date}'
"""
.
stripMargin
)
val
imp_count_newUser
=
sc
.
sql
(
s
"""
|select '${elem
}' as stat_date, count(cid_id) as get_imp_count_newUser
|from data_feed_exposure je inner join device_id_newUser
|on je.device_id = device_id_newUser.decive_id
|where je.cid_type = 'diary'
|and je.device_id regexp'${strategy}'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${elem
}'
)
val
imp_count_newUser
=
sc
.
sql
(
s
"""
|select '${param.date
}' as stat_date, count(cid_id) as get_imp_count_newUser
|from data_feed_exposure je inner join device_id_newUser
|on je.device_id = device_id_newUser.decive_id
|where je.cid_type = 'diary'
|and je.device_id regexp'${strategy}'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${param.date
}'
"""
.
stripMargin
)
imp_count_newUser
.
show
()
)
imp_count_newUser
.
show
()
val
result
=
clk_count_oldUser
.
join
(
imp_count_oldUser
,
"stat_date"
)
.
join
(
clk_count_newUser
,
"stat_date"
)
.
join
(
imp_count_newUser
,
"stat_date"
)
result
.
show
()
val
result
=
clk_count_oldUser
.
join
(
imp_count_oldUser
,
"stat_date"
)
.
join
(
clk_count_newUser
,
"stat_date"
)
.
join
(
imp_count_newUser
,
"stat_date"
)
result
.
show
()
}
}
}
}
}
\ No newline at end of file
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment