Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
93bd2948
Commit
93bd2948
authored
Nov 28, 2018
by
王志伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
新统计需求
parent
f96b177a
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
64 additions
and
5 deletions
+64
-5
strategy_other.scala
eda/feededa/src/main/scala/com/gmei/strategy_other.scala
+64
-5
No files found.
eda/feededa/src/main/scala/com/gmei/strategy_other.scala
View file @
93bd2948
...
...
@@ -178,10 +178,10 @@ object diary_exposure {
val
spark_env
=
GmeiConfig
.
getSparkSession
()
val
sc
=
spark_env
.
_2
val
ti
=
new
TiContext
(
sc
)
// ti.tidbMapTable(dbName = "mimas_prod", tableName = "api_diary_tags
")
ti
.
tidbMapTable
(
dbName
=
"eagle"
,
tableName
=
"src_mimas_prod_api_diary
"
)
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti
.
tidbMapTable
(
dbName
=
"jerry_prod"
,
tableName
=
"data_feed_exposure"
)
// ti.tidbMapTable(dbName = "zhengxing", tableName = "api_tag
")
ti
.
tidbMapTable
(
dbName
=
"jerry_prod"
,
tableName
=
"blacklist
"
)
val
mimas_url
=
"jdbc:mysql://rr-m5et21lafq1677pid.mysql.rds.aliyuncs.com/mimas_prod"
val
mimas_user
=
"mimas"
...
...
@@ -226,6 +226,25 @@ object diary_exposure {
agency_id
.
show
()
agency_id
.
createOrReplaceTempView
(
"agency_id"
)
//当日首页精选总曝光,去除结构和黑名单
val
imp_count
=
sc
.
sql
(
s
"""
|select count(cid_id) as imp_num
|from data_feed_exposure de left join agency_id
|on de.device_id = agency_id.device_id
|where de.cid_type = 'diary'
|and agency_id.device_id is null
|and de.stat_date ='2018-11-26'
|and de.device_id not in (select distinct(device_id) from blacklist)
"""
.
stripMargin
)
imp_count
.
show
()
//
//曝光表中的日记id,去除机构和黑名单
val
diary_id_temp
=
sc
.
sql
(
s
"""
|select cid_id as diary_id
...
...
@@ -234,13 +253,12 @@ object diary_exposure {
|where de.cid_type = 'diary'
|and agency_id.device_id is null
|and de.stat_date ='2018-11-26'
|and de.device_id not in (select distinct(device_id) from blacklist)
"""
.
stripMargin
)
diary_id_temp
.
createOrReplaceTempView
(
"diary_id_temp"
)
val
diary_id
=
diary_id_temp
.
rdd
.
map
(
x
=>
x
(
0
).
toString
).
collect
()
val
cid_tag
=
s
"""
|select diary_id,tag_id from api_diary_tags
...
...
@@ -260,7 +278,48 @@ object diary_exposure {
val
final_cid_city
=
diary_id_temp
.
join
(
df_cid_city
,
Seq
(
"diary_id"
),
"left_outer"
)
final_cid_city
.
show
()
final_cid_city
.
groupBy
(
"name"
).
count
().
orderBy
(
desc
(
"count"
)).
show
(
400
)
val
df1
=
final_cid_city
.
groupBy
(
"name"
).
count
().
orderBy
(
desc
(
"count"
))
df1
.
show
(
400
)
//3.5星以上日记本的id
val
diary_id_temp2
=
sc
.
sql
(
s
"""
|select id as diary_id
|from src_mimas_prod_api_diary
|where content_level >=3.5
|and doctor_id is not null
"""
.
stripMargin
)
diary_id_temp2
.
createOrReplaceTempView
(
"diary_id_temp2"
)
val
diary_id2
=
diary_id_temp2
.
rdd
.
map
(
x
=>
x
(
0
).
toString
).
collect
()
val
cid_tag2
=
s
"""
|select diary_id,tag_id from api_diary_tags
|where diary_id in (${diary_id2.map(x => s"'$x'").mkString(",")})
"""
.
stripMargin
val
cid_city2
=
mysql_df
(
sc
,
mimas_url
,
"api_diary_tags"
,
mimas_user
,
mimas_password
,
cid_tag2
)
val
tag_list2
=
cid_city2
.
select
(
"tag_id"
).
collect
().
map
(
x
=>
x
(
0
).
toString
).
distinct
val
tag_city2
=
s
"""
|select id,name from api_tag where tag_type = 4
|and id in (${tag_list2.map(x => s"'$x'").mkString(",")})
"""
.
stripMargin
val
city_df2
=
mysql_df
(
sc
,
zhengxing_url
,
"api_tag"
,
zhengxing_user
,
zhengxing_password
,
tag_city2
)
.
na
.
drop
().
withColumnRenamed
(
"id"
,
"tag_id"
)
val
df_cid_city2
=
cid_city2
.
join
(
city_df2
,
Seq
(
"tag_id"
),
"left_outer"
).
na
.
drop
()
.
drop
(
"tag_id"
)
val
final_cid_city2
=
diary_id_temp2
.
join
(
df_cid_city2
,
Seq
(
"diary_id"
),
"left_outer"
)
final_cid_city2
.
show
()
val
df2
=
final_cid_city2
.
groupBy
(
"name"
).
count
().
orderBy
(
desc
(
"count"
))
df2
.
show
(
400
)
val
df3
=
df1
.
join
(
df2
,
Seq
(
"name"
),
"left_outer"
)
df3
.
show
(
400
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment