Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
1fbc488a
Commit
1fbc488a
authored
Dec 11, 2018
by
张彦钊
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline
add data
parents
ef710cfa
5e213f0c
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
358 additions
and
94 deletions
+358
-94
Data2FFM.scala
eda/feededa/src/main/scala/com/gmei/Data2FFM.scala
+19
-6
EsmmData.scala
eda/feededa/src/main/scala/com/gmei/EsmmData.scala
+66
-50
strategy_other.scala
eda/feededa/src/main/scala/com/gmei/strategy_other.scala
+0
-4
testt.scala
eda/feededa/src/main/scala/com/gmei/testt.scala
+272
-33
Recommendation_strategy_all.py
eda/gray_stat/Recommendation_strategy_all.py
+1
-1
No files found.
eda/feededa/src/main/scala/com/gmei/Data2FFM.scala
View file @
1fbc488a
...
...
@@ -52,10 +52,11 @@ object Data2FFM {
ti
.
tidbMapTable
(
dbName
=
"jerry_test"
,
tableName
=
"esmm_pre_data"
)
// val yesteday_have_seq = GmeiConfig.getMinusNDate(5
)
val
train_sep_date
=
GmeiConfig
.
getMinusNDate
(
14
)
val
esmm_data
=
sc
.
sql
(
s
"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
|where stat_date > '${train_sep_date}'
"""
.
stripMargin
).
repartition
(
200
).
na
.
drop
()
val
column_list
=
esmm_data
.
columns
.
filter
(
x
=>
x
!=
"y"
&&
x
!=
"z"
)
...
...
@@ -114,7 +115,7 @@ object Data2FFM {
val
esmm_pre_data
=
sc
.
sql
(
s
"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
,label
|from esmm_pre_data
"""
.
stripMargin
).
repartition
(
200
).
na
.
drop
()
...
...
@@ -132,10 +133,10 @@ object Data2FFM {
val
rdd_pre
=
esmm_pre_data
.
rdd
.
repartition
(
200
)
.
map
(
x
=>
(
x
(
0
).
toString
,
x
(
1
).
toString
,
x
(
2
).
toString
,
x
(
3
).
toString
,
x
(
4
).
toString
,
x
(
5
).
toString
,
x
(
6
).
toString
,
x
(
7
).
toString
)).
filter
(
x
=>
esmm_join_cids
.
indexOf
(
x
.
_6
)
!=
-
1
)
x
(
7
).
toString
,
x
(
8
).
toString
)).
filter
(
x
=>
esmm_join_cids
.
indexOf
(
x
.
_6
)
!=
-
1
)
.
filter
(
x
=>
esmm_join_city
.
indexOf
(
x
.
_5
)
!=
-
1
)
val
pre
=
rdd_pre
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
,
x
.
_3
,
column_number
(
"device_id"
).
indexOf
(
x
.
_1
),
val
native_pre
=
rdd_pre
.
filter
(
x
=>
x
.
_9
==
"0"
)
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
,
x
.
_3
,
column_number
(
"device_id"
).
indexOf
(
x
.
_1
),
column_number
(
"stat_date"
).
indexOf
(
x
.
_4
),
column_number
(
"ucity_id"
).
indexOf
(
x
.
_5
),
column_number
(
"cid_id"
).
indexOf
(
x
.
_6
),
column_number
(
"clevel1_id"
).
indexOf
(
x
.
_7
),
column_number
(
"ccity_name"
).
indexOf
(
x
.
_8
),
x
.
_5
,
x
.
_6
))
...
...
@@ -144,8 +145,20 @@ object Data2FFM {
.
map
(
x
=>
(
x
.
_1
.
_1
,
x
.
_2
,
x
.
_1
.
_2
,
x
.
_1
.
_3
,
x
.
_1
.
_4
,
x
.
_1
.
_5
,
x
.
_1
.
_6
,
x
.
_1
.
_7
))
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
+
","
+
x
.
_3
+
","
+
x
.
_4
+
","
+
x
.
_5
,
x
.
_6
,
x
.
_7
,
x
.
_8
)).
toDF
(
"number"
,
"data"
,
"device_id"
,
"city_id"
,
"cid"
)
println
(
"pre"
)
pre
.
show
(
6
)
GmeiConfig
.
writeToJDBCTable
(
jdbcuri
,
pre
,
"esmm_data2ffm_infer"
,
SaveMode
.
Overwrite
)
native_pre
.
show
(
6
)
GmeiConfig
.
writeToJDBCTable
(
jdbcuri
,
native_pre
,
"esmm_data2ffm_infer_native"
,
SaveMode
.
Overwrite
)
val
nearby_pre
=
rdd_pre
.
filter
(
x
=>
x
.
_9
==
"1"
).
map
(
x
=>
(
x
.
_1
,
x
.
_2
,
x
.
_3
,
column_number
(
"device_id"
).
indexOf
(
x
.
_1
),
column_number
(
"stat_date"
).
indexOf
(
x
.
_4
),
column_number
(
"ucity_id"
).
indexOf
(
x
.
_5
),
column_number
(
"cid_id"
).
indexOf
(
x
.
_6
),
column_number
(
"clevel1_id"
).
indexOf
(
x
.
_7
),
column_number
(
"ccity_name"
).
indexOf
(
x
.
_8
),
x
.
_5
,
x
.
_6
))
.
map
(
x
=>
((
new
util
.
Random
).
nextInt
(
2147483647
),
x
.
_2
,
x
.
_3
,
"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0"
.
format
(
x
.
_4
,
x
.
_5
,
x
.
_6
,
x
.
_7
,
x
.
_8
,
x
.
_9
),
x
.
_1
,
x
.
_10
,
x
.
_11
)).
zipWithIndex
()
.
map
(
x
=>
(
x
.
_1
.
_1
,
x
.
_2
,
x
.
_1
.
_2
,
x
.
_1
.
_3
,
x
.
_1
.
_4
,
x
.
_1
.
_5
,
x
.
_1
.
_6
,
x
.
_1
.
_7
))
.
map
(
x
=>
(
x
.
_1
,
x
.
_2
+
","
+
x
.
_3
+
","
+
x
.
_4
+
","
+
x
.
_5
,
x
.
_6
,
x
.
_7
,
x
.
_8
)).
toDF
(
"number"
,
"data"
,
"device_id"
,
"city_id"
,
"cid"
)
println
(
"pre"
)
nearby_pre
.
show
(
6
)
GmeiConfig
.
writeToJDBCTable
(
jdbcuri
,
nearby_pre
,
"esmm_data2ffm_infer_nearby"
,
SaveMode
.
Overwrite
)
sc
.
stop
()
...
...
eda/feededa/src/main/scala/com/gmei/EsmmData.scala
View file @
1fbc488a
...
...
@@ -18,7 +18,8 @@ object EsmmData {
Logger
.
getLogger
(
"org.apache.spark"
).
setLevel
(
Level
.
WARN
)
Logger
.
getLogger
(
"org.apache.eclipse.jetty.server"
).
setLevel
(
Level
.
OFF
)
case
class
Params
(
env
:
String
=
"dev"
case
class
Params
(
env
:
String
=
"dev"
,
date
:
String
=
GmeiConfig
.
getMinusNDate
(
1
)
)
extends
AbstractParams
[
Params
]
with
Serializable
val
defaultParams
=
Params
()
...
...
@@ -28,6 +29,9 @@ object EsmmData {
opt
[
String
](
"env"
)
.
text
(
s
"the databases environment you used"
)
.
action
((
x
,
c
)
=>
c
.
copy
(
env
=
x
))
opt
[
String
](
"date"
)
.
text
(
s
"the date you used"
)
.
action
((
x
,
c
)
=>
c
.
copy
(
date
=
x
))
note
(
"""
|For example, the following command runs this app on a tidb dataset:
...
...
@@ -53,9 +57,8 @@ object EsmmData {
ti
.
tidbMapTable
(
dbName
=
"jerry_prod"
,
tableName
=
"data_feed_exposure"
)
println
(
"新修改的"
)
import
sc.implicits._
val
stat_date
=
GmeiConfig
.
getMinusNDate
(
14
)
val
stat_date
=
param
.
date
println
(
stat_date
)
val
imp_data
=
sc
.
sql
(
s
"""
...
...
@@ -63,9 +66,9 @@ object EsmmData {
| cid_id,diary_service_id
|from data_feed_exposure
|where cid_type = 'diary'
|and stat_date
>
'${stat_date}'
|and stat_date
=
'${stat_date}'
"""
.
stripMargin
)
.
repartition
(
200
)
)
// imp_data.show()
// println("imp_data.count()")
// println(imp_data.count())
...
...
@@ -77,9 +80,9 @@ object EsmmData {
| cid_id,diary_service_id
|from data_feed_click
|where cid_type = 'diary'
|and stat_date
>
'${stat_date}'
|and stat_date
=
'${stat_date}'
"""
.
stripMargin
)
.
repartition
(
200
)
)
// clk_data.show()
// println("clk_data.count()")
// println(clk_data.count())
...
...
@@ -93,7 +96,7 @@ object EsmmData {
// println(imp_data_filter.count())
val
stat_date_not
=
GmeiConfig
.
getMinusNDate
(
14
)
.
replace
(
"-"
,
""
)
val
stat_date_not
=
stat_date
.
replace
(
"-"
,
""
)
val
cvr_data
=
sc
.
sql
(
s
"""
|select distinct
...
...
@@ -102,11 +105,11 @@ object EsmmData {
| params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
|from online.tl_hdfs_maidian_view
|where action='page_view'
|and partition_date
>
'${stat_date_not}'
|and partition_date
=
'${stat_date_not}'
|and params['page_name'] = 'welfare_detail'
|and params['referrer'] = 'diary_detail'
"""
.
stripMargin
)
.
repartition
(
200
)
)
val
cvr_data_filter
=
cvr_data
.
withColumn
(
"y"
,
lit
(
1
)).
withColumn
(
"z"
,
lit
(
1
))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
...
...
@@ -130,7 +133,7 @@ object EsmmData {
// println(union_data.count())
val
yesteday
=
GmeiConfig
.
getMinusNDate
(
1
).
replace
(
"-"
,
""
)
val
union_data_clabel
=
sc
.
sql
(
s
"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
...
...
@@ -138,8 +141,8 @@ object EsmmData {
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${
yesteday
}'
|and c.partition_date='${
yesteday
}'
|where b.partition_date='${
stat_date_not
}'
|and c.partition_date='${
stat_date_not
}'
"""
.
stripMargin
)
union_data_clabel
.
createOrReplaceTempView
(
"union_data_clabel"
)
...
...
@@ -152,8 +155,8 @@ object EsmmData {
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${
yesteday
}'
|and c.partition_date='${
yesteday
}'
|where b.partition_date='${
stat_date_not
}'
|and c.partition_date='${
stat_date_not
}'
"""
.
stripMargin
)
union_data_slabel
.
createOrReplaceTempView
(
"union_data_slabel"
)
...
...
@@ -181,14 +184,14 @@ object EsmmData {
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
|left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
|where b.partition_date='${
yesteday
}'
|and c.partition_date='${
yesteday
}'
|and d.partition_date='${
yesteday
}'
|where b.partition_date='${
stat_date_not
}'
|and c.partition_date='${
stat_date_not
}'
|and d.partition_date='${
stat_date_not
}'
"""
.
stripMargin
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id
.
show
()
GmeiConfig
.
writeToJDBCTable
(
"jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
,
union_data_scity_id
,
table
=
"esmm_train_data"
,
SaveMode
.
Overwrite
)
GmeiConfig
.
writeToJDBCTable
(
"jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
,
union_data_scity_id
,
table
=
"esmm_train_data"
,
SaveMode
.
Append
)
...
...
@@ -240,24 +243,18 @@ object EsmmPredData {
ti
.
tidbMapTable
(
dbName
=
"eagle"
,
tableName
=
"src_mimas_prod_api_diary_tags"
)
ti
.
tidbMapTable
(
dbName
=
"eagle"
,
tableName
=
"src_zhengxing_api_tag"
)
ti
.
tidbMapTable
(
dbName
=
"jerry_prod"
,
tableName
=
"data_feed_exposure"
)
ti
.
tidbMapTable
(
dbName
=
"jerry_prod"
,
tableName
=
"data_feed_click"
)
ti
.
tidbMapTable
(
"jerry_prod"
,
"nd_device_cid_similarity_matrix"
)
ti
.
tidbMapTable
(
"eagle"
,
"ffm_diary_queue"
)
ti
.
tidbMapTable
(
"eagle"
,
"search_queue"
)
ti
.
tidbMapTable
(
dbName
=
"jerry_test"
,
tableName
=
"esmm_train_data"
)
ti
.
tidbMapTable
(
"eagle"
,
"biz_feed_diary_queue"
)
import
sc.implicits._
val
yesteday_have_seq
=
GmeiConfig
.
getMinusNDate
(
1
)
// val activate_data = sc.sql(
// s"""
// |select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
// |from merge_queue_table a
// |left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
// |where b.stat_date ='${yesteday_have_seq}'
// |and b.device_id is not null
// """.stripMargin
// )
//nearby_data
val
raw_data
=
sc
.
sql
(
s
"""
|select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
...
...
@@ -266,9 +263,9 @@ object EsmmPredData {
|select device_id,city_id,native_queue as merge_queue from ffm_diary_queue
|union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from
esmm_train_data
)
|where tmp1.device_id in (select distinct device_id from
data_feed_click where stat_date='${yesteday_have_seq}'
)
"""
.
stripMargin
)
.
repartition
(
200
)
)
raw_data
.
show
()
...
...
@@ -280,40 +277,59 @@ object EsmmPredData {
(
device_id
,
city_id
,
s
"$cids"
)
}.
filter
(
_
.
_3
!=
""
).
toDF
(
"device_id"
,
"city_id"
,
"merge_queue"
)
raw_data1
.
createOrReplaceTempView
(
"raw_data1"
)
println
(
raw_data1
.
count
())
println
(
"nearby_device_count"
,
raw_data1
.
count
())
val
raw_data2
=
sc
.
sql
(
s
"""
|select device_id,city_id
,merge_queue from raw_data1 limit 10000
|select device_id,city_id
as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data1
"""
.
stripMargin
).
repartition
(
200
)
).
withColumn
(
"label"
,
lit
(
1
)
)
raw_data2
.
createOrReplaceTempView
(
"raw_data2"
)
println
(
raw_data2
.
count
())
raw_data2
.
show
()
println
(
"nearby_explode_count"
,
raw_data2
.
count
())
val
raw_data3
=
sc
.
sql
(
// native_data
val
native_data
=
sc
.
sql
(
s
"""
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data2
|select distinct a.device_id,a.city_id,b.native_queue from data_feed_click a
|left join biz_feed_diary_queue b on a.city_id = b.city_id
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
"""
.
stripMargin
).
repartition
(
200
)
raw_data3
.
createOrReplaceTempView
(
"raw_data"
)
println
(
raw_data3
.
count
())
)
native_data
.
createOrReplaceTempView
(
"native_data"
)
println
(
"native_device_count"
,
native_data
.
count
())
val
native_data1
=
sc
.
sql
(
s
"""
|select device_id,city_id as ucity_id,
|explode(split(split(native_queue, concat(',',split(native_queue,',')[300]))[0],',')) as cid_id
|from native_data
"""
.
stripMargin
).
withColumn
(
"label"
,
lit
(
0
))
native_data1
.
createOrReplaceTempView
(
"native_data1"
)
println
(
"native_explode_count"
,
native_data1
.
count
())
// activate_data.createOrReplaceTempView("raw_data")
// raw_data.show()
//union
val
union_data
=
sc
.
sql
(
s
"""
|select device_id,ucity_id,cid_id,label from native_data1
|union
|select device_id,ucity_id,cid_id,label from raw_data2
"""
.
stripMargin
)
union_data
.
createOrReplaceTempView
(
"raw_data"
)
println
(
"union_count"
,
union_data
.
count
())
val
yesteday
=
GmeiConfig
.
getMinusNDate
(
1
).
replace
(
"-"
,
""
)
//join feat
val
yesteday
=
yesteday_have_seq
.
replace
(
"-"
,
""
)
val
sid_data
=
sc
.
sql
(
s
"""
|select distinct
| from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
| a.device_id,a.ucity_id,a.cid_id, b.service_id as diary_service_id
| a.device_id,a.ucity_id,a.cid_id,
a.label,
b.service_id as diary_service_id
|from raw_data a
|left join online.ml_community_diary_updates b on a.cid_id = b.diary_id
|where b.partition_date = '${yesteday}'
...
...
@@ -328,7 +344,7 @@ object EsmmPredData {
val
union_data_clabel
=
sc
.
sql
(
s
"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.
label,a.
diary_service_id,a.y,a.z,
| c.level1_id as clevel1_id
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
...
...
@@ -342,7 +358,7 @@ object EsmmPredData {
val
union_data_slabel
=
sc
.
sql
(
s
"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.
label,a.
diary_service_id,a.y,a.z,a.clevel1_id,
| c.level1_id as slevel1_id
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
...
...
@@ -357,7 +373,7 @@ object EsmmPredData {
val
union_data_ccity_name
=
sc
.
sql
(
s
"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.
label,a.
diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name
|from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
...
...
@@ -370,7 +386,7 @@ object EsmmPredData {
val
union_data_scity_id
=
sc
.
sql
(
s
"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.
label,a.
diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id
|from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
...
...
eda/feededa/src/main/scala/com/gmei/strategy_other.scala
View file @
1fbc488a
...
...
@@ -240,10 +240,6 @@ object diary_exposure {
)
imp_count
.
show
()
//
//曝光表中的日记id,去除机构和黑名单
val
diary_id_temp
=
sc
.
sql
(
s
"""
...
...
eda/feededa/src/main/scala/com/gmei/testt.scala
View file @
1fbc488a
...
...
@@ -50,10 +50,17 @@ object testt {
ti
.
tidbMapTable
(
dbName
=
"jerry_prod"
,
tableName
=
"blacklist"
)
ti
.
tidbMapTable
(
dbName
=
"jerry_prod"
,
tableName
=
"data_feed_exposure"
)
//
val stat_date = GmeiConfig.getMinusNDate(1)
val
stat_date
=
param
.
date
val
stat_date
=
GmeiConfig
.
getMinusNDate
(
1
)
//
val stat_date=param.date
val
partition_date
=
stat_date
.
replace
(
"-"
,
""
)
//机构id
val
blacklist
=
sc
.
sql
(
s
"""
|select device_id from blacklist
"""
.
stripMargin
)
blacklist
.
createOrReplaceTempView
(
"blacklist"
)
val
agency_id
=
sc
.
sql
(
s
"""
|SELECT DISTINCT(cl_id) as device_id
...
...
@@ -69,87 +76,319 @@ object testt {
|AND pv_ratio >= 0.95
"""
.
stripMargin
)
//
agency_id.show()
agency_id
.
show
()
agency_id
.
createOrReplaceTempView
(
"agency_id"
)
//每日新用户
val
device_id_newUser
=
sc
.
sql
(
s
"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where active_type != '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
"""
.
stripMargin
)
device_id_newUser
.
show
()
device_id_newUser
.
createOrReplaceTempView
(
"device_id_new"
)
//每日老用户
val
device_id_oldUser
=
sc
.
sql
(
s
"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where active_type = '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
"""
.
stripMargin
)
device_id_oldUser
.
show
()
device_id_oldUser
.
createOrReplaceTempView
(
"device_id_old"
)
//日记本转化美购
//1.日记本到美购转化数
val
diary_meigou_
count
=
sc
.
sql
(
val
diary_meigou_
temp
=
sc
.
sql
(
s
"""
|select
'${stat_date}' as stat_date, count(page_name) as diary_meigou_count
|select
ou.cl_id as device_id
|from online.bl_hdfs_page_view_updates ou left join agency_id
|on ou.cl_id = agency_id.device_id
|where ou.partition_date = '${partition_date}'
|and ou.page_name='welfare_detail'
|and ou.referrer='diary_detail'
|and agency_id.device_id is not null
|and ou.cl_id not in (select device_id from blacklist)
|and agency_id.device_id is null
"""
.
stripMargin
)
diary_meigou_temp
.
createOrReplaceTempView
(
"diary_meigou_temp"
)
val
diary_meigou_device
=
sc
.
sql
(
s
"""
|select dt.device_id
|from diary_meigou_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
"""
.
stripMargin
)
diary_meigou_device
.
createOrReplaceTempView
(
"diary_meigou_device"
)
//新用户到美购详情页的转化
val
diary_meigou_newUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_meigou_newUser
|from diary_meigou_device dd left join device_id_new
|on dd.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
"""
.
stripMargin
)
diary_meigou_count
.
show
()
//老用户到美购详情页的转化
val
diary_meigou_oldUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_meigou_oldUser
|from diary_meigou_device dd left join device_id_old
|on dd.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
"""
.
stripMargin
)
// val diary_meigou_count = sc.sql(
// s"""
// |select '${stat_date}' as stat_date, count(page_name) as diary_meigou_count
// |from online.bl_hdfs_page_view_updates ou left join agency_id
// |on ou.cl_id = agency_id.device_id
// |where ou.partition_date = '${partition_date}'
// |and ou.page_name='welfare_detail'
// |and ou.referrer='diary_detail'
// |and agency_id.device_id is null
// |and ou.cl_id not in (select device_id from blacklist)
// """.stripMargin
// )
// diary_meigou_count.show()
//2.日记本点击数
val
diary_clk
=
sc
.
sql
(
val
diary_clk
_temp
=
sc
.
sql
(
s
"""
|select
'${stat_date}' as stat_date,count(cl_id) as diary_clk
|select
ov.cl_id as device_id
|from online.tl_hdfs_maidian_view ov left join agency_id
|on ov.cl_id = agency_id.device_id
|where ov.action = 'on_click_diary_card'
|and ov.cl_id != "NULL"
|and ov.partition_date='${partition_date}'
|and agency_id.device_id is not null
|and ov.cl_id not in (select device_id from blacklist)
|and agency_id.device_id is null
"""
.
stripMargin
)
diary_clk
.
show
()
diary_clk_temp
.
createOrReplaceTempView
(
"diary_clk_temp"
)
val
diary_clk_device
=
sc
.
sql
(
s
"""
|select dt.device_id
|from diary_clk_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
"""
.
stripMargin
)
diary_clk_device
.
createOrReplaceTempView
(
"diary_clk_device"
)
//新用户日记本点击
val
diary_clk_newUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_clk_newUser
|from diary_clk_device dd left join device_id_new
|on dd.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
"""
.
stripMargin
)
//老用户日记本点击
val
diary_clk_oldUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_clk_oldUser
|from diary_clk_device dd left join device_id_old
|on dd.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
"""
.
stripMargin
)
// val diary_clk = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(cl_id) as diary_clk
// |from online.tl_hdfs_maidian_view ov left join agency_id
// |on ov.cl_id = agency_id.device_id
// |where ov.action = 'on_click_diary_card'
// |and ov.cl_id != "NULL"
// |and ov.partition_date='${partition_date}'
// |and agency_id.device_id is null
// |and ov.cl_id not in (select device_id from blacklist)
// """.stripMargin
// )
// diary_clk.show()
//3.日记本曝光数
val
diary_expoure
=
sc
.
sql
(
val
diary_expoure
_temp
=
sc
.
sql
(
s
"""
|select
'${stat_date}' as stat_date,count(cl_id) as diary_expoure
|select
od.cl_id as device_id
|from online.ml_community_exposure_detail od left join agency_id
|on od.cl_id = agency_id.device_id
|where od.business_type = "diary"
|and od.cl_id != "NULL"
|and od.partition_date='${partition_date}'
|and agency_id.device_id is not null
|and od.cl_id not in (select device_id from blacklist)
|and agency_id.device_id is null
"""
.
stripMargin
)
diary_expoure
.
show
()
diary_expoure_temp
.
createOrReplaceTempView
(
"diary_expoure_temp"
)
val
diary_expoure_device
=
sc
.
sql
(
s
"""
|select dt.device_id
|from diary_expoure_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
"""
.
stripMargin
)
diary_expoure_device
.
createOrReplaceTempView
(
"diary_expoure_device"
)
//新用户日记本曝光
val
diary_exp_newUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_exp_newUser
|from diary_expoure_device dd left join device_id_new
|on dd.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
"""
.
stripMargin
)
//老用户日记本曝光
val
diary_exp_oldUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_exp_oldUser
|from diary_expoure_device dd left join device_id_old
|on dd.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
"""
.
stripMargin
)
// val diary_expoure=sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(cl_id) as diary_expoure
// |from online.ml_community_exposure_detail od left join agency_id
// |on od.cl_id = agency_id.device_id
// |where od.business_type = "diary"
// |and od.cl_id != "NULL"
// |and od.partition_date='${partition_date}'
// |and agency_id.device_id is null
// |and od.cl_id not in (select device_id from blacklist)
// """.stripMargin
// )
// diary_expoure.show()
//4.搜索次数统计
val
search_
count
=
sc
.
sql
(
val
search_
device_temp
=
sc
.
sql
(
s
"""
|select
'${stat_date}' as stat_date,count(params['query']) as search_num
|select
ov.cl_id as device_id
|from online.tl_hdfs_maidian_view ov left join agency_id
|on ov.cl_id = agency_id.device_id
|where (ov.action = 'do_search' or ov.action = 'search_result_click_search')
|and ov.partition_date ='${partition_date}'
|and agency_id.device_id is not null
|and ov.cl_id not in (select device_id from blacklist)
|and agency_id.device_id is null
"""
.
stripMargin
)
search_
count
.
show
(
)
search_
device_temp
.
createOrReplaceTempView
(
"search_device_temp"
)
//5.登录人数
val
search_device
=
sc
.
sql
(
s
"""
|select dt.device_id
|from search_device_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
"""
.
stripMargin
)
search_device
.
createOrReplaceTempView
(
"search_device"
)
//新用户搜索次数
val
search_newUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(sd.device_id) as search_newUser
|from search_device sd left join device_id_new
|on sd.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
"""
.
stripMargin
)
//老用户日搜索次数
val
search_oldUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(sd.device_id) as search_oldUser
|from search_device sd left join device_id_old
|on sd.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
"""
.
stripMargin
)
val
log_num
=
sc
.
sql
(
//5.登录人数
val
log_device_temp
=
sc
.
sql
(
s
"""
|select
'${stat_date}' as stat_date,count(distinct(device_id)) as log_num
|select
distinct(oe.device_id) as device_id
|from data_feed_exposure oe left join agency_id
|on oe.device_id = agency_id.device_id
|and oe.stat_date ='${stat_date}'
|and agency_id.device_id is not null
|and oe.device_id not in (select device_id from blacklist)
|and agency_id.device_id is null
"""
.
stripMargin
)
log_device_temp
.
createOrReplaceTempView
(
"log_device_temp"
)
val
log_device
=
sc
.
sql
(
s
"""
|select dt.device_id
|from log_device_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
"""
.
stripMargin
)
log_device
.
createOrReplaceTempView
(
"log_device"
)
//新用户登录人数
val
log_newUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(distinct(ld.device_id)) as log_newUser
|from log_device ld left join device_id_new
|on ld.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
"""
.
stripMargin
)
//老用户登录人数
val
log_oldUser
=
sc
.
sql
(
s
"""
|select '${stat_date}' as stat_date, count(distinct(ld.device_id)) as log_oldUser
|from log_device ld left join device_id_old
|on ld.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
"""
.
stripMargin
)
log_num
.
show
()
// val log_num = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(distinct(device_id)) as log_num
// |from data_feed_exposure oe left join agency_id
// |on oe.device_id = agency_id.device_id
// |and oe.stat_date ='${stat_date}'
// |and agency_id.device_id is null
// |and oe.device_id not in (select device_id from blacklist)
// """.stripMargin
// )
// log_num.show()
val
result
=
diary_meigou_count
.
join
(
diary_clk
,
"stat_date"
)
.
join
(
diary_expoure
,
"stat_date"
)
.
join
(
search_count
,
"stat_date"
)
.
join
(
log_num
,
"stat_date"
)
val
result
=
diary_meigou_newUser
.
join
(
diary_meigou_oldUser
,
"stat_date"
)
.
join
(
diary_clk_newUser
,
"stat_date"
)
.
join
(
diary_clk_oldUser
,
"stat_date"
)
.
join
(
diary_exp_newUser
,
"stat_date"
)
.
join
(
diary_exp_oldUser
,
"stat_date"
)
.
join
(
search_newUser
,
"stat_date"
)
.
join
(
search_oldUser
,
"stat_date"
)
.
join
(
log_newUser
,
"stat_date"
)
.
join
(
log_oldUser
,
"stat_date"
)
GmeiConfig
.
writeToJDBCTable
(
result
,
"diary_meigou_crv"
,
SaveMode
.
Append
)
...
...
eda/gray_stat/Recommendation_strategy_all.py
View file @
1fbc488a
...
...
@@ -92,7 +92,7 @@ if __name__ == '__main__':
output_path
=
OUTPUT_PATH
+
"recommendation.csv"
with
open
(
output_path
,
'a+'
)
as
f
:
line
=
my_date
.
replace
(
'-'
,
''
)
+
','
+
str
(
result1_clk
[
0
])
+
','
+
str
(
result1_clk
[
1
])
+
','
+
str
(
result1_clk
[
2
])
+
','
+
str
(
result1_clk
[
3
])
\
line
=
my_date
+
','
+
str
(
result1_clk
[
0
])
+
','
+
str
(
result1_clk
[
1
])
+
','
+
str
(
result1_clk
[
2
])
+
','
+
str
(
result1_clk
[
3
])
\
+
','
+
str
(
result1_imp
[
0
])
+
','
+
str
(
result1_imp
[
1
])
+
','
+
str
(
result1_imp
[
2
])
+
','
+
str
(
result1_imp
[
3
])
+
','
\
+
str
(
result2_clk
[
0
])
+
','
+
str
(
result2_clk
[
1
])
+
','
+
str
(
result2_clk
[
2
])
+
','
+
str
(
result2_clk
[
3
])
\
+
','
+
str
(
result2_imp
[
0
])
+
','
+
str
(
result2_imp
[
1
])
+
','
+
str
(
result2_imp
[
2
])
+
','
+
str
(
result2_imp
[
3
])
\
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment