Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
ctr-estimate
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
赵建伟
ctr-estimate
Commits
5364b4d3
Commit
5364b4d3
authored
Apr 10, 2020
by
赵建伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update codes
parent
157c8226
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
993 additions
and
204 deletions
+993
-204
start.sh
bin/start.sh
+1
-1
start_clk.sh
bin/start_clk.sh
+1
-1
start_tag.sh
bin/start_tag.sh
+1
-1
ctr-estimate.sql
libs/ctr-estimate.sql
+36
-20
ProdCtrEstimateMain.java
src/main/java/com/gmei/data/ctr/ProdCtrEstimateMain.java
+1
-5
CtrEstimatePfrEtl.java
src/main/java/com/gmei/data/ctr/bean/CtrEstimatePfrEtl.java
+77
-0
DeviceCurrentEstimatePfr.java
...java/com/gmei/data/ctr/bean/DeviceCurrentEstimatePfr.java
+230
-0
DeviceCurrentEstimatePfrTmp.java
...a/com/gmei/data/ctr/bean/DeviceCurrentEstimatePfrTmp.java
+157
-0
ProdCtrEstimateMainPfr.java
...n/java/com/gmei/data/ctr/main/ProdCtrEstimateMainPfr.java
+25
-12
TestCtrEstimateMainPfr.java
...n/java/com/gmei/data/ctr/main/TestCtrEstimateMainPfr.java
+21
-17
CtrEstimatePfrOperator.java
...va/com/gmei/data/ctr/operator/CtrEstimatePfrOperator.java
+127
-139
CtrEstimateTagOperator.java
...va/com/gmei/data/ctr/operator/CtrEstimateTagOperator.java
+4
-4
CtrEstimatePfrMysqlSink.java
.../java/com/gmei/data/ctr/sink/CtrEstimatePfrMysqlSink.java
+137
-0
TidbMysqlAsyncPfrSource.java
...ava/com/gmei/data/ctr/source/TidbMysqlAsyncPfrSource.java
+158
-0
TidbMysqlAsyncTagSource.java
...ava/com/gmei/data/ctr/source/TidbMysqlAsyncTagSource.java
+2
-2
ZxMysqlAsyncTagSource.java
.../java/com/gmei/data/ctr/source/ZxMysqlAsyncTagSource.java
+2
-2
DateUtils.java
src/main/java/com/gmei/data/ctr/utils/DateUtils.java
+13
-0
No files found.
bin/start.sh
View file @
5364b4d3
...
...
@@ -8,7 +8,7 @@ export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSw
nohup
$FLINK_HOME
/bin/flink run
\
-m
yarn-cluster
\
-ynm
ctr-estimate
\
-yqu
data
\
-yqu
hadoop
\
-yn
2
\
-ys
2
\
-p
6
\
...
...
bin/start_clk.sh
View file @
5364b4d3
...
...
@@ -8,7 +8,7 @@ export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSw
nohup
$FLINK_HOME
/bin/flink run
\
-m
yarn-cluster
\
-ynm
ctr-estimate-clk
\
-yqu
data
\
-yqu
hadoop
\
-yn
2
\
-ys
2
\
-p
6
\
...
...
bin/start_tag.sh
View file @
5364b4d3
...
...
@@ -8,7 +8,7 @@ export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSw
nohup
$FLINK_HOME
/bin/flink run
\
-m
yarn-cluster
\
-ynm
ctr-estimate-tag
\
-yqu
data
\
-yqu
hadoop
\
-yn
2
\
-ys
2
\
-p
6
\
...
...
libs/ctr-estimate.sql
View file @
5364b4d3
...
...
@@ -8,15 +8,6 @@ CREATE TABLE `device_current_estimate_clk` (
`content_card_click`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'日记贴点击量'
,
`tractate_card_click`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'用户贴点击量'
,
`answer_card_click`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'问答贴点击量'
,
`like_diary_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'点赞日记数量'
,
`like_card_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'点赞帖子数量'
,
`like_answer_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'点赞回答数量'
,
`discuss_diary_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'评论日记数量'
,
`discuss_card_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'评论帖子数量'
,
`discuss_answer_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'评论回答数量'
,
`collect_diary_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'收藏日记数量'
,
`collect_card_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'收藏帖子数量'
,
`collect_answer_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'收藏问答数量'
,
`partition_date`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'日期'
,
`last_update_time`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'上一次更改的时间'
,
PRIMARY
KEY
(
`id`
)
...
...
@@ -61,19 +52,44 @@ CREATE TABLE `device_current_estimate_tag_unplat` (
PRIMARY
KEY
(
`id`
)
)
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
-- CTR特征预估标签平台表
CREATE
TABLE
`device_recent_estimate_view_pfr`
(
`id`
bigint
(
20
)
NOT
NULL
AUTO_INCREMENT
COMMENT
'自增ID'
,
`device_id`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'设备ID'
,
`diary_preference`
text
COMMENT
'日记偏好'
,
`qa_preference`
text
COMMENT
'问答偏好'
,
`card_preference`
text
COMMENT
'帖子偏好'
,
`service_preference`
text
COMMENT
'美购偏好'
,
`partition_date`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'日期'
,
`last_update_time`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'上一次更改的时间'
,
PRIMARY
KEY
(
`id`
)
-- CTR特征预估最近偏好表
CREATE
TABLE
`device_recently_estimate_view_pfr`
(
`id`
bigint
(
20
)
NOT
NULL
AUTO_INCREMENT
COMMENT
'自增ID'
,
`device_id`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'设备ID'
,
`statistics_type`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'统计类型 01:美购,02:日记,03:帖子,04:问答'
,
`statistics_type_id`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'统计类型id'
,
`project_pfr`
text
COMMENT
'项目偏好'
,
`first_demands_pfr`
text
COMMENT
'一级诉求偏好'
,
`first_positions_pfr`
text
COMMENT
'一级部位偏好'
,
`first_solutions_pfr`
text
COMMENT
'一级方式偏好'
,
`second_demands_pfr`
text
COMMENT
'二级诉求偏好'
,
`second_positions_pfr`
text
COMMENT
'二级部位偏好'
,
`second_solutions_pfr`
text
COMMENT
'二级方式偏好'
,
`partition_date`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'日期'
,
`last_update_time`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'上一次更改的时间'
,
PRIMARY
KEY
(
`id`
)
)
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
-- CTR特征预估点击量表
#
CREATE
TABLE
`device_current_estimate_clk`
(
#
`id`
bigint
(
20
)
NOT
NULL
AUTO_INCREMENT
COMMENT
'自增ID'
,
#
`device_id`
varchar
(
150
)
DEFAULT
NULL
COMMENT
'设备ID'
,
#
`content_card_click`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'日记贴点击量'
,
#
`tractate_card_click`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'用户贴点击量'
,
#
`answer_card_click`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'问答贴点击量'
,
#
`like_diary_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'点赞日记数量'
,
#
`like_card_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'点赞帖子数量'
,
#
`like_answer_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'点赞回答数量'
,
#
`discuss_diary_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'评论日记数量'
,
#
`discuss_card_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'评论帖子数量'
,
#
`discuss_answer_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'评论回答数量'
,
#
`collect_diary_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'收藏日记数量'
,
#
`collect_card_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'收藏帖子数量'
,
#
`collect_answer_count`
bigint
(
20
)
DEFAULT
NULL
COMMENT
'收藏问答数量'
,
#
`partition_date`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'日期'
,
#
`last_update_time`
varchar
(
45
)
DEFAULT
NULL
COMMENT
'上一次更改的时间'
,
#
PRIMARY
KEY
(
`id`
)
#
)
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8mb4
;
-- CTR特征预估标签表
#
CREATE
TABLE
`device_current_estimate`
(
...
...
src/main/java/com/gmei/data/ctr/ProdCtrEstimateMain.java
View file @
5364b4d3
...
...
@@ -43,7 +43,6 @@ public class ProdCtrEstimateMain {
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJerryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inJerryUsername
=
parameterTool
.
get
(
"inJerryUsername"
,
"data_user"
);
String
inJerryPassword
=
parameterTool
.
get
(
"inJerryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
// 核心参数打印
System
.
out
.
println
(
"**********************************************************"
);
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
...
...
@@ -61,7 +60,6 @@ public class ProdCtrEstimateMain {
System
.
out
.
println
(
"*** inJerryUsername: "
+
inJerryUsername
);
System
.
out
.
println
(
"*** inJerryPassword: "
+
inJerryPassword
);
System
.
out
.
println
(
"**********************************************************"
);
// 获得流处理环境对象
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
...
...
@@ -70,7 +68,7 @@ public class ProdCtrEstimateMain {
env
.
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
1
,
3000
));
CheckpointConfig
config
=
env
.
getCheckpointConfig
();
config
.
enableExternalizedCheckpoints
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
RETAIN_ON_CANCELLATION
);
// 输入源
DataStream
MaidianDataStream
=
new
MaidianKafkaSource
(
env
,
inBrokers
,
...
...
@@ -81,7 +79,6 @@ public class ProdCtrEstimateMain {
isStartFromLatest
,
startTime
).
getInstance
();
// 执行处理核心逻辑
new
CtrEstimateClkOperator
(
MaidianDataStream
,
outJdbcUrl
,
...
...
@@ -107,7 +104,6 @@ public class ProdCtrEstimateMain {
inJerryUsername
,
inJerryPassword
).
run
();
// 常驻执行
env
.
execute
(
"ctr-estimate"
);
}
...
...
src/main/java/com/gmei/data/ctr/bean/CtrEstimatePfrEtl.java
0 → 100644
View file @
5364b4d3
package
com
.
gmei
.
data
.
ctr
.
bean
;
/**
* @ClassName CtrEstimatePfrEtl
* @Author zhaojianwei
* @Date 2020/4/1
* @Version V1.0
**/
public
class
CtrEstimatePfrEtl
{
private
String
deviceId
;
private
String
statisticsType
;
private
String
statisticsTypeId
;
private
String
partitionDate
;
private
String
lastUpdateTime
;
public
CtrEstimatePfrEtl
(
String
deviceId
,
String
statisticsType
,
String
statisticsTypeId
,
String
partitionDate
,
String
lastUpdateTime
)
{
this
.
deviceId
=
deviceId
;
this
.
statisticsType
=
statisticsType
;
this
.
statisticsTypeId
=
statisticsTypeId
;
this
.
partitionDate
=
partitionDate
;
this
.
lastUpdateTime
=
lastUpdateTime
;
}
public
CtrEstimatePfrEtl
()
{
}
public
String
getDeviceId
()
{
return
deviceId
;
}
public
void
setDeviceId
(
String
deviceId
)
{
this
.
deviceId
=
deviceId
;
}
public
String
getStatisticsType
()
{
return
statisticsType
;
}
public
void
setStatisticsType
(
String
statisticsType
)
{
this
.
statisticsType
=
statisticsType
;
}
public
String
getStatisticsTypeId
()
{
return
statisticsTypeId
;
}
public
void
setStatisticsTypeId
(
String
statisticsTypeId
)
{
this
.
statisticsTypeId
=
statisticsTypeId
;
}
public
String
getPartitionDate
()
{
return
partitionDate
;
}
public
void
setPartitionDate
(
String
partitionDate
)
{
this
.
partitionDate
=
partitionDate
;
}
public
String
getLastUpdateTime
()
{
return
lastUpdateTime
;
}
public
void
setLastUpdateTime
(
String
lastUpdateTime
)
{
this
.
lastUpdateTime
=
lastUpdateTime
;
}
@Override
public
String
toString
()
{
return
"CtrEstimatePfrEtl{"
+
"deviceId='"
+
deviceId
+
'\''
+
", statisticsType='"
+
statisticsType
+
'\''
+
", statisticsTypeId='"
+
statisticsTypeId
+
'\''
+
", partitionDate='"
+
partitionDate
+
'\''
+
", lastUpdateTime='"
+
lastUpdateTime
+
'\''
+
'}'
;
}
}
src/main/java/com/gmei/data/ctr/bean/DeviceCurrentEstimatePfr.java
0 → 100644
View file @
5364b4d3
package
com
.
gmei
.
data
.
ctr
.
bean
;
/**
* @ClassName DeviceCurrentEstimateTag
* @Author apple
* @Date 2020/3/31
* @Version V1.0
**/
public
class
DeviceCurrentEstimatePfr
{
private
String
deviceId
;
private
String
platFirstPositions
;
private
String
platFirstSolutions
;
private
String
platFirstDemands
;
private
String
platProject
;
private
String
contentFirstPositions
;
private
String
contentFirstSolutions
;
private
String
contentFirstDemands
;
private
String
contentProject
;
private
String
commodityFirstPositions
;
private
String
commodityFirstSolutions
;
private
String
commodityFirstDemands
;
private
String
commodityProject
;
private
String
platSecondPositions
;
private
String
platSecondSolutions
;
private
String
platSecondDemands
;
private
String
contentSecondPositions
;
private
String
contentSecondSolutions
;
private
String
contentSecondDemands
;
private
String
commoditySecondPositions
;
private
String
commoditySecondSolutions
;
private
String
commoditySecondDemands
;
private
String
partitionDate
;
private
String
lastUpdateTime
;
public
DeviceCurrentEstimatePfr
()
{
}
public
String
getDeviceId
()
{
return
deviceId
;
}
public
void
setDeviceId
(
String
deviceId
)
{
this
.
deviceId
=
deviceId
;
}
public
String
getPlatFirstPositions
()
{
return
platFirstPositions
;
}
public
void
setPlatFirstPositions
(
String
platFirstPositions
)
{
this
.
platFirstPositions
=
platFirstPositions
;
}
public
String
getPlatFirstSolutions
()
{
return
platFirstSolutions
;
}
public
void
setPlatFirstSolutions
(
String
platFirstSolutions
)
{
this
.
platFirstSolutions
=
platFirstSolutions
;
}
public
String
getPlatFirstDemands
()
{
return
platFirstDemands
;
}
public
void
setPlatFirstDemands
(
String
platFirstDemands
)
{
this
.
platFirstDemands
=
platFirstDemands
;
}
public
String
getPlatProject
()
{
return
platProject
;
}
public
void
setPlatProject
(
String
platProject
)
{
this
.
platProject
=
platProject
;
}
public
String
getContentFirstPositions
()
{
return
contentFirstPositions
;
}
public
void
setContentFirstPositions
(
String
contentFirstPositions
)
{
this
.
contentFirstPositions
=
contentFirstPositions
;
}
public
String
getContentFirstSolutions
()
{
return
contentFirstSolutions
;
}
public
void
setContentFirstSolutions
(
String
contentFirstSolutions
)
{
this
.
contentFirstSolutions
=
contentFirstSolutions
;
}
public
String
getContentFirstDemands
()
{
return
contentFirstDemands
;
}
public
void
setContentFirstDemands
(
String
contentFirstDemands
)
{
this
.
contentFirstDemands
=
contentFirstDemands
;
}
public
String
getContentProject
()
{
return
contentProject
;
}
public
void
setContentProject
(
String
contentProject
)
{
this
.
contentProject
=
contentProject
;
}
public
String
getCommodityFirstPositions
()
{
return
commodityFirstPositions
;
}
public
void
setCommodityFirstPositions
(
String
commodityFirstPositions
)
{
this
.
commodityFirstPositions
=
commodityFirstPositions
;
}
public
String
getCommodityFirstSolutions
()
{
return
commodityFirstSolutions
;
}
public
void
setCommodityFirstSolutions
(
String
commodityFirstSolutions
)
{
this
.
commodityFirstSolutions
=
commodityFirstSolutions
;
}
public
String
getCommodityFirstDemands
()
{
return
commodityFirstDemands
;
}
public
void
setCommodityFirstDemands
(
String
commodityFirstDemands
)
{
this
.
commodityFirstDemands
=
commodityFirstDemands
;
}
public
String
getCommodityProject
()
{
return
commodityProject
;
}
public
void
setCommodityProject
(
String
commodityProject
)
{
this
.
commodityProject
=
commodityProject
;
}
public
String
getPlatSecondPositions
()
{
return
platSecondPositions
;
}
public
void
setPlatSecondPositions
(
String
platSecondPositions
)
{
this
.
platSecondPositions
=
platSecondPositions
;
}
public
String
getPlatSecondSolutions
()
{
return
platSecondSolutions
;
}
public
void
setPlatSecondSolutions
(
String
platSecondSolutions
)
{
this
.
platSecondSolutions
=
platSecondSolutions
;
}
public
String
getPlatSecondDemands
()
{
return
platSecondDemands
;
}
public
void
setPlatSecondDemands
(
String
platSecondDemands
)
{
this
.
platSecondDemands
=
platSecondDemands
;
}
public
String
getContentSecondPositions
()
{
return
contentSecondPositions
;
}
public
void
setContentSecondPositions
(
String
contentSecondPositions
)
{
this
.
contentSecondPositions
=
contentSecondPositions
;
}
public
String
getContentSecondSolutions
()
{
return
contentSecondSolutions
;
}
public
void
setContentSecondSolutions
(
String
contentSecondSolutions
)
{
this
.
contentSecondSolutions
=
contentSecondSolutions
;
}
public
String
getContentSecondDemands
()
{
return
contentSecondDemands
;
}
public
void
setContentSecondDemands
(
String
contentSecondDemands
)
{
this
.
contentSecondDemands
=
contentSecondDemands
;
}
public
String
getCommoditySecondPositions
()
{
return
commoditySecondPositions
;
}
public
void
setCommoditySecondPositions
(
String
commoditySecondPositions
)
{
this
.
commoditySecondPositions
=
commoditySecondPositions
;
}
public
String
getCommoditySecondSolutions
()
{
return
commoditySecondSolutions
;
}
public
void
setCommoditySecondSolutions
(
String
commoditySecondSolutions
)
{
this
.
commoditySecondSolutions
=
commoditySecondSolutions
;
}
public
String
getCommoditySecondDemands
()
{
return
commoditySecondDemands
;
}
public
void
setCommoditySecondDemands
(
String
commoditySecondDemands
)
{
this
.
commoditySecondDemands
=
commoditySecondDemands
;
}
public
String
getPartitionDate
()
{
return
partitionDate
;
}
public
void
setPartitionDate
(
String
partitionDate
)
{
this
.
partitionDate
=
partitionDate
;
}
public
String
getLastUpdateTime
()
{
return
lastUpdateTime
;
}
public
void
setLastUpdateTime
(
String
lastUpdateTime
)
{
this
.
lastUpdateTime
=
lastUpdateTime
;
}
}
src/main/java/com/gmei/data/ctr/bean/DeviceCurrentEstimatePfrTmp.java
0 → 100644
View file @
5364b4d3
package
com
.
gmei
.
data
.
ctr
.
bean
;
/**
* @ClassName DeviceCurrentEstimatePfrTmp
* @Author apple
* @Date 2020/3/31
* @Version V1.0
**/
public
class
DeviceCurrentEstimatePfrTmp
{
private
String
deviceId
;
private
String
statisticsType
;
private
String
statisticsTypeId
;
private
String
projectPfr
;
private
String
firstDemandsPfr
;
private
String
firstPositionsPfr
;
private
String
firstSolutionsPfr
;
private
String
secondDemandsPfr
;
private
String
secondPositionsPfr
;
private
String
secondSolutionsPfr
;
private
String
partitionDate
;
private
String
lastUpdateTime
;
public
DeviceCurrentEstimatePfrTmp
(
String
deviceId
,
String
statisticsType
,
String
statisticsTypeId
,
String
projectPfr
,
String
firstDemandsPfr
,
String
firstPositionsPfr
,
String
firstSolutionsPfr
,
String
secondDemandsPfr
,
String
secondPositionsPfr
,
String
secondSolutionsPfr
,
String
partitionDate
,
String
lastUpdateTime
)
{
this
.
deviceId
=
deviceId
;
this
.
statisticsType
=
statisticsType
;
this
.
statisticsTypeId
=
statisticsTypeId
;
this
.
projectPfr
=
projectPfr
;
this
.
firstDemandsPfr
=
firstDemandsPfr
;
this
.
firstPositionsPfr
=
firstPositionsPfr
;
this
.
firstSolutionsPfr
=
firstSolutionsPfr
;
this
.
secondDemandsPfr
=
secondDemandsPfr
;
this
.
secondPositionsPfr
=
secondPositionsPfr
;
this
.
secondSolutionsPfr
=
secondSolutionsPfr
;
this
.
partitionDate
=
partitionDate
;
this
.
lastUpdateTime
=
lastUpdateTime
;
}
public
DeviceCurrentEstimatePfrTmp
()
{
}
public
String
getDeviceId
()
{
return
deviceId
;
}
public
void
setDeviceId
(
String
deviceId
)
{
this
.
deviceId
=
deviceId
;
}
public
String
getStatisticsType
()
{
return
statisticsType
;
}
public
void
setStatisticsType
(
String
statisticsType
)
{
this
.
statisticsType
=
statisticsType
;
}
public
String
getStatisticsTypeId
()
{
return
statisticsTypeId
;
}
public
void
setStatisticsTypeId
(
String
statisticsTypeId
)
{
this
.
statisticsTypeId
=
statisticsTypeId
;
}
public
String
getProjectPfr
()
{
return
projectPfr
;
}
public
void
setProjectPfr
(
String
projectPfr
)
{
this
.
projectPfr
=
projectPfr
;
}
public
String
getFirstDemandsPfr
()
{
return
firstDemandsPfr
;
}
public
void
setFirstDemandsPfr
(
String
firstDemandsPfr
)
{
this
.
firstDemandsPfr
=
firstDemandsPfr
;
}
public
String
getFirstPositionsPfr
()
{
return
firstPositionsPfr
;
}
public
void
setFirstPositionsPfr
(
String
firstPositionsPfr
)
{
this
.
firstPositionsPfr
=
firstPositionsPfr
;
}
public
String
getFirstSolutionsPfr
()
{
return
firstSolutionsPfr
;
}
public
void
setFirstSolutionsPfr
(
String
firstSolutionsPfr
)
{
this
.
firstSolutionsPfr
=
firstSolutionsPfr
;
}
public
String
getSecondDemandsPfr
()
{
return
secondDemandsPfr
;
}
public
void
setSecondDemandsPfr
(
String
secondDemandsPfr
)
{
this
.
secondDemandsPfr
=
secondDemandsPfr
;
}
public
String
getSecondPositionsPfr
()
{
return
secondPositionsPfr
;
}
public
void
setSecondPositionsPfr
(
String
secondPositionsPfr
)
{
this
.
secondPositionsPfr
=
secondPositionsPfr
;
}
public
String
getSecondSolutionsPfr
()
{
return
secondSolutionsPfr
;
}
public
void
setSecondSolutionsPfr
(
String
secondSolutionsPfr
)
{
this
.
secondSolutionsPfr
=
secondSolutionsPfr
;
}
public
String
getPartitionDate
()
{
return
partitionDate
;
}
public
void
setPartitionDate
(
String
partitionDate
)
{
this
.
partitionDate
=
partitionDate
;
}
public
String
getLastUpdateTime
()
{
return
lastUpdateTime
;
}
public
void
setLastUpdateTime
(
String
lastUpdateTime
)
{
this
.
lastUpdateTime
=
lastUpdateTime
;
}
@Override
public
String
toString
()
{
return
"DeviceCurrentEstimatePfrTmp{"
+
"deviceId='"
+
deviceId
+
'\''
+
", statisticsType='"
+
statisticsType
+
'\''
+
", statisticsTypeId='"
+
statisticsTypeId
+
'\''
+
", projectPfr='"
+
projectPfr
+
'\''
+
", firstDemandsPfr='"
+
firstDemandsPfr
+
'\''
+
", firstPositionsPfr='"
+
firstPositionsPfr
+
'\''
+
", firstSolutionsPfr='"
+
firstSolutionsPfr
+
'\''
+
", secondDemandsPfr='"
+
secondDemandsPfr
+
'\''
+
", secondPositionsPfr='"
+
secondPositionsPfr
+
'\''
+
", secondSolutionsPfr='"
+
secondSolutionsPfr
+
'\''
+
", partitionDate='"
+
partitionDate
+
'\''
+
", lastUpdateTime='"
+
lastUpdateTime
+
'\''
+
'}'
;
}
}
src/main/java/com/gmei/data/ctr/main/ProdCtrEstimateMainPfr.java
View file @
5364b4d3
package
com
.
gmei
.
data
.
ctr
.
main
;
import
com.gmei.data.ctr.operator.CtrEstimateClkOperator
;
import
com.gmei.data.ctr.operator.CtrEstimatePfrOperator
;
import
com.gmei.data.ctr.source.MaidianKafkaSource
;
import
org.apache.flink.api.common.restartstrategy.RestartStrategies
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.runtime.state.filesystem.FsStateBackend
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
...
...
@@ -34,7 +36,10 @@ public class ProdCtrEstimateMainPfr {
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJerryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inJerryUsername
=
parameterTool
.
get
(
"inJerryUsername"
,
"data_user"
);
String
inJerryPassword
=
parameterTool
.
get
(
"inJerryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
// 参数打印
System
.
out
.
println
(
"**********************************************************"
);
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
...
...
@@ -45,16 +50,15 @@ public class ProdCtrEstimateMainPfr {
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
System
.
out
.
println
(
"*** slideSize: "
+
slideSize
);
System
.
out
.
println
(
"**********************************************************"
);
// 获得流处理环境对象
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//
env.enableCheckpointing(1000);
//
env.setStateBackend(new FsStateBackend(checkpointPath));
//
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
//
CheckpointConfig config = env.getCheckpointConfig();
//
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env
.
enableCheckpointing
(
1000
);
env
.
setStateBackend
(
new
FsStateBackend
(
checkpointPath
));
env
.
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
1
,
3000
));
CheckpointConfig
config
=
env
.
getCheckpointConfig
();
config
.
enableExternalizedCheckpoints
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
RETAIN_ON_CANCELLATION
);
// 数据输入源
DataStream
MaidianDataStream
=
new
MaidianKafkaSource
(
env
,
inBrokers
,
...
...
@@ -65,10 +69,19 @@ public class ProdCtrEstimateMainPfr {
isStartFromLatest
,
startTime
).
getInstance
();
// 执行处理核心逻辑
new
CtrEstimatePfrOperator
(
MaidianDataStream
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
new
CtrEstimatePfrOperator
(
MaidianDataStream
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
,
inJerryJdbcUrl
,
inJerryUsername
,
inJerryPassword
).
run
();
// 常驻执行
env
.
execute
(
"ctr-estimate-clk"
);
}
...
...
src/main/java/com/gmei/data/ctr/main/TestCtrEstimateMainPfr.java
View file @
5364b4d3
...
...
@@ -14,26 +14,28 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Version V1.0
**/
public
class
TestCtrEstimateMainPfr
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 获取运行参数
ParameterTool
parameterTool
=
ParameterTool
.
fromArgs
(
args
);
String
inBrokers
=
parameterTool
.
get
(
"inBrokers"
,
"test003:9092"
);
String
batchSize
=
parameterTool
.
get
(
"batchSize"
,
"1000"
);
String
maidianInTopic
=
parameterTool
.
get
(
"maidianInTopic"
,
"test11"
);
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"
test-ctr-estimate-pfr
"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
60
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
60
);
String
maidianInGroupId
=
parameterTool
.
get
(
"maidianInGroupId"
,
"
ctr-estimate-tag
"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
5
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
5
);
String
outJdbcUrl
=
parameterTool
.
get
(
"outJdbcUrl"
,
"jdbc:mysql://172.18.44.3:3306/jerry_test?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/ctr-estimate/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
fals
e
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
tru
e
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
String
inJerryJdbcUrl
=
parameterTool
.
get
(
"inJerryJdbcUrl"
,
"jdbc:mysql://172.16.40.170:4000/jerry_test?characterEncoding=UTF-8&autoReconnect=true&useSSL=false"
);
String
inJerryUsername
=
parameterTool
.
get
(
"inJerryUsername"
,
"data_user"
);
String
inJerryPassword
=
parameterTool
.
get
(
"inJerryPassword"
,
"YPEzp78HQBuhByWPpefQu6X3D6hEPfD6"
);
// 参数打印
System
.
out
.
println
(
"**********************************************************"
);
System
.
out
.
println
(
"*** inBrokers: "
+
inBrokers
);
System
.
out
.
println
(
"*** maidianInTopic: "
+
maidianInTopic
);
...
...
@@ -44,16 +46,9 @@ public class TestCtrEstimateMainPfr {
System
.
out
.
println
(
"*** windowSize: "
+
windowSize
);
System
.
out
.
println
(
"*** slideSize: "
+
slideSize
);
System
.
out
.
println
(
"**********************************************************"
);
// 获得流处理环境对象
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 数据输入源
DataStream
MaidianDataStream
=
new
MaidianKafkaSource
(
env
,
inBrokers
,
...
...
@@ -64,10 +59,19 @@ public class TestCtrEstimateMainPfr {
isStartFromLatest
,
startTime
).
getInstance
();
// 执行处理核心逻辑
new
CtrEstimatePfrOperator
(
MaidianDataStream
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
).
run
();
new
CtrEstimatePfrOperator
(
MaidianDataStream
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
,
windowSize
,
slideSize
,
inJerryJdbcUrl
,
inJerryUsername
,
inJerryPassword
).
run
();
// 常驻执行
env
.
execute
(
"ctr-estimate-clk"
);
}
...
...
src/main/java/com/gmei/data/ctr/operator/CtrEstimatePfrOperator.java
View file @
5364b4d3
...
...
@@ -2,28 +2,28 @@ package com.gmei.data.ctr.operator;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.gmei.data.ctr.bean.CtrEstimateClkEtl
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimateClk
;
import
com.gmei.data.ctr.sink.CtrEstimateClkMysqlSink
;
import
com.gmei.data.ctr.bean.CtrEstimatePfrEtl
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp
;
import
com.gmei.data.ctr.sink.CtrEstimatePfrMysqlSink
;
import
com.gmei.data.ctr.sink.CtrEstimateTagMysqlSink
;
import
com.gmei.data.ctr.source.TidbMysqlAsyncPfrSource
;
import
com.gmei.data.ctr.source.TidbMysqlAsyncTagSource
;
import
com.gmei.data.ctr.utils.DateUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.
api.java.functions.KeySelector
;
import
org.apache.flink.
streaming.api.datastream.AsyncDataStream
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
;
import
org.apache.flink.streaming.api.windowing.time.Time
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.util.Collector
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.concurrent.TimeUnit
;
/**
* @ClassName CtrEstimate
Pfr
Operator
* @Description: CTR特征预估
偏好
* @ClassName CtrEstimate
Tag
Operator
* @Description: CTR特征预估
标签表
* @Author zhaojianwei
* @Date 2020/
3/3
1
* @Date 2020/
4/0
1
* @Version V1.0
**/
public
class
CtrEstimatePfrOperator
implements
BaseOperator
{
...
...
@@ -34,8 +34,11 @@ public class CtrEstimatePfrOperator implements BaseOperator{
private
int
parallelism
;
private
int
windowSize
;
private
int
slideSize
;
public
CtrEstimatePfrOperator
(
DataStream
dataStream
,
String
outJdbcUrl
,
int
maxRetry
,
long
retryInteral
,
int
parallelism
,
int
windowSize
,
int
slideSize
)
{
private
String
jerryJdbcUrl
;
private
String
jerryUsername
;
private
String
jerryPassword
;
public
CtrEstimatePfrOperator
(
DataStream
dataStream
,
String
outJdbcUrl
,
int
maxRetry
,
long
retryInteral
,
int
parallelism
,
int
windowSize
,
int
slideSize
,
String
jerryJdbcUrl
,
String
jerryUsername
,
String
jerryPassword
)
{
this
.
dataStream
=
dataStream
;
this
.
outJdbcUrl
=
outJdbcUrl
;
this
.
maxRetry
=
maxRetry
;
...
...
@@ -43,142 +46,127 @@ public class CtrEstimatePfrOperator implements BaseOperator{
this
.
parallelism
=
parallelism
;
this
.
windowSize
=
windowSize
;
this
.
slideSize
=
slideSize
;
this
.
jerryJdbcUrl
=
jerryJdbcUrl
;
this
.
jerryUsername
=
jerryUsername
;
this
.
jerryPassword
=
jerryPassword
;
}
@Override
public
void
run
()
{
dataStream
.
filter
(
new
FilterFunction
<
String
>()
{
@Override
public
boolean
filter
(
String
value
)
throws
Exception
{
return
JSON
.
isValid
(
value
);
}
})
.
map
(
new
MapFunction
<
String
,
JSONObject
>()
{
@Override
public
JSONObject
map
(
String
value
)
throws
Exception
{
return
JSON
.
parseObject
(
value
);
}
})
.
filter
(
new
FilterFunction
<
JSONObject
>()
{
SingleOutputStreamOperator
jsonStream
=
dataStream
.
filter
(
new
FilterFunction
<
String
>()
{
@Override
public
boolean
filter
(
String
value
)
throws
Exception
{
return
JSON
.
isValid
(
value
);
}
})
.
map
(
new
MapFunction
<
String
,
JSONObject
>()
{
@Override
public
JSONObject
map
(
String
value
)
throws
Exception
{
return
JSON
.
parseObject
(
value
);
}
});
// jsonStream.print();
SingleOutputStreamOperator
filter
=
jsonStream
.
filter
(
new
FilterFunction
<
JSONObject
>()
{
@Override
public
boolean
filter
(
JSONObject
jsonObject
)
throws
Exception
{
Double
gmNginxTimestamp
=
jsonObject
.
getDouble
(
"gm_nginx_timestamp"
);
if
(
null
!=
gmNginxTimestamp
){
long
gmNginxTimestampLong
=
Math
.
round
(
gmNginxTimestamp
*
1000
);
String
currentDateStr
=
DateUtils
.
getCurrentDateStr
();
long
currentDateBegin
=
DateUtils
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
long
currentDateend
=
DateUtils
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
if
(
gmNginxTimestampLong
>=
currentDateBegin
&&
gmNginxTimestampLong
<=
currentDateend
){
String
type
=
jsonObject
.
getString
(
"type"
);
Double
gmNginxTimestamp
=
jsonObject
.
getDouble
(
"gm_nginx_timestamp"
);
if
(
null
!=
gmNginxTimestamp
)
{
long
gmNginxTimestampLong
=
Math
.
round
(
gmNginxTimestamp
*
1000
);
String
currentDateStr
=
DateUtils
.
getCurrentDateStr
();
long
currentDateBegin
=
DateUtils
.
getTimestampByDateStr
(
currentDateStr
+
" 00:00:00"
);
long
currentDateend
=
DateUtils
.
getTimestampByDateStr
(
currentDateStr
+
" 23:59:59"
);
if
(
gmNginxTimestampLong
>=
currentDateBegin
&&
gmNginxTimestampLong
<=
currentDateend
)
{
String
type
=
jsonObject
.
getString
(
"type"
);
JSONObject
deviceObject
=
jsonObject
.
getJSONObject
(
"device"
);
if
(
null
!=
deviceObject
&&
StringUtils
.
isNotBlank
(
type
))
{
String
deviceId
=
deviceObject
.
getString
(
"device_id"
);
String
idfv
=
deviceObject
.
getString
(
"idfv"
);
String
clId
=
""
;
if
(
StringUtils
.
isBlank
(
deviceId
)
&&
StringUtils
.
isNotBlank
(
idfv
))
{
clId
=
idfv
;
}
else
{
clId
=
deviceId
;
}
if
(
StringUtils
.
isNotBlank
(
clId
))
{
if
(
"page_view"
.
equals
(
type
))
{
JSONObject
paramsObject
=
jsonObject
.
getJSONObject
(
"params"
);
JSONObject
deviceObject
=
jsonObject
.
getJSONObject
(
"device"
);
if
(
null
!=
paramsObject
&&
null
!=
deviceObject
&&
StringUtils
.
isNotBlank
(
type
)){
String
deviceId
=
deviceObject
.
getString
(
"device_id"
);
String
idfv
=
deviceObject
.
getString
(
"idfv"
);
String
clId
=
""
;
if
(
StringUtils
.
isBlank
(
deviceId
)
&&
StringUtils
.
isNotBlank
(
idfv
)){
clId
=
idfv
;
}
else
{
clId
=
deviceId
;
}
if
(
StringUtils
.
isNotBlank
(
clId
)){
String
pageName
=
paramsObject
.
getString
(
"page_name"
);
String
tabName
=
paramsObject
.
getString
(
"tab_name"
);
if
(
null
!=
pageName
&&
null
!=
tabName
&&
"home"
.
equals
(
pageName
.
trim
())
&&
"精选"
.
equals
(
tabName
.
trim
())){
String
cardContentType
=
paramsObject
.
getString
(
"card_content_type"
);
if
(
"on_click_post_card"
.
equals
(
type
)
||
(
"on_click_card"
.
equals
(
type
)
&&
"user_post"
.
equals
(
cardContentType
))
||
(
"search_result_click_infomation_item"
.
equals
(
type
)
&&
"11"
.
equals
(
paramsObject
.
getString
(
"business_type"
)))){
return
true
;
}
String
[]
types
=
{
"on_click_topic_card"
,
"staggered_topic_click"
,
"zone_detail_click_topic"
,
"zone_v3_click_diary_topic"
,
"diarybook_detail_click_diary_item"
,
"on_click_ugc_topic"
};
String
[]
cardContentTypes
=
{
"topic_detail"
,
"topic"
};
if
(
Arrays
.
asList
(
types
).
contains
(
type
)
||
(
"on_click_card"
.
equals
(
type
)
&&
Arrays
.
asList
(
cardContentTypes
).
contains
(
cardContentType
))){
return
true
;
}
}
String
referrer
=
paramsObject
.
getString
(
"referrer"
);
if
(
"page_view"
.
equals
(
type
)
&&
"answer_detail"
.
equals
(
pageName
)
&&
"home"
.
equals
(
referrer
)){
return
true
;
}
if
(
null
!=
paramsObject
)
{
String
pageName
=
paramsObject
.
getString
(
"page_name"
);
if
(
"diary_detail"
.
equals
(
pageName
)
||
"diarybook_detail"
.
equals
(
pageName
)
||
"topic_detail"
.
equals
(
pageName
)
||
"welfare_detail"
.
equals
(
pageName
)
||
"answer_detail"
.
equals
(
pageName
))
{
String
businessId
=
paramsObject
.
getString
(
"business_id"
);
if
(
null
!=
businessId
)
{
return
true
;
}
}
}
}
}
}
}
return
false
;
}
})
.
map
(
new
MapFunction
<
JSONObject
,
CtrEstimateClkEtl
>()
{
@Override
public
CtrEstimateClkEtl
map
(
JSONObject
jsonObject
)
throws
Exception
{
String
type
=
jsonObject
.
getString
(
"type"
);
JSONObject
paramsObject
=
jsonObject
.
getJSONObject
(
"params"
);
JSONObject
deviceObject
=
jsonObject
.
getJSONObject
(
"device"
);
String
deviceId
=
deviceObject
.
getString
(
"device_id"
);
String
idfv
=
deviceObject
.
getString
(
"idfv"
);
String
clId
=
""
;
if
(
StringUtils
.
isBlank
(
deviceId
)
&&
StringUtils
.
isNotBlank
(
idfv
)){
clId
=
idfv
;
}
else
{
clId
=
deviceId
;
}
String
cardContentType
=
paramsObject
.
getString
(
"card_content_type"
);
if
(
"on_click_post_card"
.
equals
(
type
)
||
(
"on_click_card"
.
equals
(
type
)
&&
"user_post"
.
equals
(
cardContentType
))
||
(
"search_result_click_infomation_item"
.
equals
(
type
)
&&
"11"
.
equals
(
paramsObject
.
getString
(
"business_type"
)))){
return
new
CtrEstimateClkEtl
(
clId
,
"tractate_card"
,
1
);
}
String
[]
types
=
{
"on_click_topic_card"
,
"staggered_topic_click"
,
"zone_detail_click_topic"
,
"zone_v3_click_diary_topic"
,
"diarybook_detail_click_diary_item"
,
"on_click_ugc_topic"
};
String
[]
cardContentTypes
=
{
"topic_detail"
,
"topic"
};
if
(
Arrays
.
asList
(
types
).
contains
(
type
)
||
(
"on_click_card"
.
equals
(
type
)
&&
Arrays
.
asList
(
cardContentTypes
).
contains
(
cardContentType
))){
return
new
CtrEstimateClkEtl
(
clId
,
"content_card"
,
1
);
}
String
pageName
=
paramsObject
.
getString
(
"page_name"
);
String
referrer
=
paramsObject
.
getString
(
"referrer"
);
if
(
"page_view"
.
equals
(
type
)
&&
"answer_detail"
.
equals
(
pageName
)
&&
"home"
.
equals
(
referrer
)){
return
new
CtrEstimateClkEtl
(
clId
,
"answer_card"
,
1
);
}
return
new
CtrEstimateClkEtl
();
}
})
.
keyBy
(
new
KeySelector
<
CtrEstimateClkEtl
,
String
>()
{
@Override
public
String
getKey
(
CtrEstimateClkEtl
estimateClickEtl
)
throws
Exception
{
return
estimateClickEtl
.
getDeviceId
()
+
"_"
+
estimateClickEtl
.
getEstimateType
();
}
})
//.timeWindow(Time.minutes(windowSize), Time.minutes(slideSize))
.
timeWindow
(
Time
.
seconds
(
windowSize
),
Time
.
seconds
(
slideSize
))
.
process
(
new
ProcessWindowFunction
<
CtrEstimateClkEtl
,
DeviceCurrentEstimateClk
,
String
,
TimeWindow
>()
{
@Override
public
void
process
(
String
key
,
Context
context
,
Iterable
<
CtrEstimateClkEtl
>
estimateClickEtls
,
Collector
<
DeviceCurrentEstimateClk
>
out
)
{
/* 数据转置
111 a 1
111 b 1
222 a 1
333 b 1
device_id a b c
111 1 0 0
111 0 1 0
222 1 0 0
333 0 1 0 */
Date
date
=
new
Date
();
for
(
CtrEstimateClkEtl
estimateClickEtl
:
estimateClickEtls
)
{
DeviceCurrentEstimateClk
deviceCurrentEstimateClk
=
new
DeviceCurrentEstimateClk
();
deviceCurrentEstimateClk
.
setDeviceId
(
estimateClickEtl
.
getDeviceId
());
deviceCurrentEstimateClk
.
setPartitionDate
(
DateUtils
.
getDateStr
(
date
));
deviceCurrentEstimateClk
.
setLastUpdateTime
(
DateUtils
.
getTimeStr
(
date
));
if
(
"tractate_card"
.
equals
(
estimateClickEtl
.
getEstimateType
())){
deviceCurrentEstimateClk
.
setTractateCardClick
(
1L
);
}
else
if
(
"content_card"
.
equals
(
estimateClickEtl
.
getEstimateType
())){
deviceCurrentEstimateClk
.
setContentCardClick
(
1L
);
}
else
if
(
"answer_card"
.
equals
(
estimateClickEtl
.
getEstimateType
())){
deviceCurrentEstimateClk
.
setAnswerCardClick
(
1L
);
return
false
;
}
});
//filter.print();
SingleOutputStreamOperator
map
=
filter
.
map
(
new
MapFunction
<
JSONObject
,
CtrEstimatePfrEtl
>()
{
@Override
public
CtrEstimatePfrEtl
map
(
JSONObject
jsonObject
)
throws
Exception
{
String
type
=
jsonObject
.
getString
(
"type"
);
JSONObject
deviceObject
=
jsonObject
.
getJSONObject
(
"device"
);
CtrEstimatePfrEtl
ctrEstimatePfrEtl
=
null
;
if
(
null
!=
deviceObject
&&
StringUtils
.
isNotBlank
(
type
))
{
String
deviceId
=
deviceObject
.
getString
(
"device_id"
);
String
idfv
=
deviceObject
.
getString
(
"idfv"
);
String
clId
=
""
;
if
(
StringUtils
.
isBlank
(
deviceId
)
&&
StringUtils
.
isNotBlank
(
idfv
))
{
clId
=
idfv
;
}
else
{
clId
=
deviceId
;
}
if
(
StringUtils
.
isNotBlank
(
clId
))
{
if
(
"page_view"
.
equals
(
type
))
{
JSONObject
paramsObject
=
jsonObject
.
getJSONObject
(
"params"
);
if
(
null
!=
paramsObject
){
String
pageName
=
paramsObject
.
getString
(
"page_name"
);
String
businessId
=
paramsObject
.
getString
(
"business_id"
);
if
(
StringUtils
.
isNotBlank
(
businessId
)){
ctrEstimatePfrEtl
=
new
CtrEstimatePfrEtl
();
ctrEstimatePfrEtl
.
setDeviceId
(
clId
);
ctrEstimatePfrEtl
.
setStatisticsTypeId
(
businessId
);
if
((
"diary_detail"
.
equals
(
pageName
)
||
"diarybook_detail"
.
equals
(
pageName
))){
ctrEstimatePfrEtl
.
setStatisticsType
(
"diary"
);
}
else
if
(
"topic_detail"
.
equals
(
pageName
)){
ctrEstimatePfrEtl
.
setStatisticsType
(
"tractate"
);
}
else
if
(
"welfare_detail"
.
equals
(
pageName
)){
ctrEstimatePfrEtl
.
setStatisticsType
(
"service"
);
}
else
if
(
"answer_detail"
.
equals
(
pageName
)){
ctrEstimatePfrEtl
.
setStatisticsType
(
"answer"
);
}
}
}
}
}
}
return
ctrEstimatePfrEtl
;
}
out
.
collect
(
deviceCurrentEstimateClk
);
}
}
})
.
addSink
(
new
CtrEstimateClkMysqlSink
(
outJdbcUrl
,
maxRetry
,
retryInteral
))
});
//map.print();
DataStream
<
DeviceCurrentEstimatePfrTmp
>
tidbAsyncDataStream
=
AsyncDataStream
.
unorderedWait
(
map
,
new
TidbMysqlAsyncPfrSource
(
jerryJdbcUrl
,
jerryUsername
,
jerryPassword
),
1
,
TimeUnit
.
MINUTES
,
1000
)
.
uid
(
"tidbAsyncDataStream"
)
.
setParallelism
(
parallelism
);
tidbAsyncDataStream
.
addSink
(
new
CtrEstimatePfrMysqlSink
(
outJdbcUrl
,
maxRetry
,
retryInteral
))
.
setParallelism
(
parallelism
);
}
}
src/main/java/com/gmei/data/ctr/operator/CtrEstimateTagOperator.java
View file @
5364b4d3
...
...
@@ -5,8 +5,8 @@ import com.alibaba.fastjson.JSONObject;
import
com.gmei.data.ctr.bean.CtrEstimateTagEtl
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp
;
import
com.gmei.data.ctr.sink.CtrEstimateTagMysqlSink
;
import
com.gmei.data.ctr.source.Z
hengxingMysqlAsync
Source
;
import
com.gmei.data.ctr.source.TidbMysqlAsyncSource
;
import
com.gmei.data.ctr.source.Z
xMysqlAsyncTag
Source
;
import
com.gmei.data.ctr.source.TidbMysqlAsync
Tag
Source
;
import
com.gmei.data.ctr.utils.DateUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FilterFunction
;
...
...
@@ -174,11 +174,11 @@ public class CtrEstimateTagOperator implements BaseOperator{
});
//map.print();
DataStream
<
DeviceCurrentEstimateTagTmp
>
tidbAsyncDataStream
=
AsyncDataStream
.
unorderedWait
(
map
,
new
TidbMysqlAsyncSource
(
jerryJdbcUrl
,
jerryUsername
,
jerryPassword
),
1
,
TimeUnit
.
MINUTES
,
1000
)
.
unorderedWait
(
map
,
new
TidbMysqlAsync
Tag
Source
(
jerryJdbcUrl
,
jerryUsername
,
jerryPassword
),
1
,
TimeUnit
.
MINUTES
,
1000
)
.
uid
(
"tidbAsyncDataStream"
)
.
setParallelism
(
parallelism
);
DataStream
<
DeviceCurrentEstimateTagTmp
>
zhengxingAsyncDataStream
=
AsyncDataStream
.
unorderedWait
(
map
,
new
Z
hengxingMysqlAsync
Source
(
zxJdbcUrl
,
zxUsername
,
zxPassword
),
1
,
TimeUnit
.
MINUTES
,
1000
)
.
unorderedWait
(
map
,
new
Z
xMysqlAsyncTag
Source
(
zxJdbcUrl
,
zxUsername
,
zxPassword
),
1
,
TimeUnit
.
MINUTES
,
1000
)
.
uid
(
"zhengxingAsyncDataStream"
)
.
setParallelism
(
parallelism
);
DataStream
<
DeviceCurrentEstimateTagTmp
>
asyncDataStream
=
tidbAsyncDataStream
.
union
(
zhengxingAsyncDataStream
);
...
...
src/main/java/com/gmei/data/ctr/sink/CtrEstimatePfrMysqlSink.java
0 → 100644
View file @
5364b4d3
package
com
.
gmei
.
data
.
ctr
.
sink
;
import
com.alibaba.fastjson.JSONObject
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimatePfr
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimateTag
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimateTagTmp
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtils
;
import
com.gmei.data.ctr.utils.JDBCUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
import
java.sql.*
;
import
java.util.Date
;
/**
* @ClassName CtrEstimatePfrMysqlSink
* @Description: CTR特征预估偏好MysqlSink
* @Author zhaojianwei
* @Date 2020/3/31
* @Version V1.0
**/
public
class
CtrEstimatePfrMysqlSink
extends
RichSinkFunction
<
DeviceCurrentEstimatePfrTmp
>
{
private
int
maxRetry
;
private
long
retryInteral
;
private
String
jdbcUrl
;
private
Connection
connection
;
public
CtrEstimatePfrMysqlSink
(
String
jdbcUrl
,
int
maxRetry
,
long
retryInteral
)
{
this
.
jdbcUrl
=
jdbcUrl
;
this
.
maxRetry
=
maxRetry
;
this
.
retryInteral
=
retryInteral
;
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
Class
.
forName
(
Constants
.
MYSQL_DRIVER_CLASS
);
connection
=
DriverManager
.
getConnection
(
jdbcUrl
);
super
.
open
(
parameters
);
}
@Override
public
void
invoke
(
DeviceCurrentEstimatePfrTmp
deviceCurrentEstimatePfrTmp
,
Context
context
)
throws
Exception
{
try
{
insertOrDel
(
deviceCurrentEstimatePfrTmp
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
int
numReties
=
1
;
Exception
lastException
=
e
;
while
(
numReties
<=
maxRetry
){
try
{
numReties
++;
Thread
.
sleep
(
retryInteral
);
insertOrDel
(
deviceCurrentEstimatePfrTmp
);
}
catch
(
Exception
e1
){
lastException
=
e1
;
continue
;
}
return
;
}
throw
lastException
;
}
}
@Override
public
void
close
()
throws
Exception
{
JDBCUtils
.
close
(
connection
,
null
,
null
);
super
.
close
();
}
/**
* 数据写入方法
* @param deviceCurrentEstimatePfrTmp
* @throws SQLException
*/
private
void
insertOrDel
(
DeviceCurrentEstimatePfrTmp
deviceCurrentEstimatePfrTmp
)
{
Statement
statement
=
null
;
java
.
util
.
Date
date
=
new
Date
();
try
{
statement
=
connection
.
createStatement
();
statement
.
executeUpdate
(
String
.
format
(
"insert into device_recently_estimate_view_pfr("
+
"device_id,"
+
"statistics_type,"
+
"statistics_type_id,"
+
"project_pfr,"
+
"first_demands_pfr,"
+
"first_positions_pfr,"
+
"first_solutions_pfr,"
+
"second_demands_pfr,"
+
"second_positions_pfr,"
+
"second_solutions_pfr,"
+
"partition_date,"
+
"last_update_time"
+
") values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')"
,
deviceCurrentEstimatePfrTmp
.
getDeviceId
(),
deviceCurrentEstimatePfrTmp
.
getStatisticsType
(),
deviceCurrentEstimatePfrTmp
.
getStatisticsTypeId
(),
deviceCurrentEstimatePfrTmp
.
getProjectPfr
(),
deviceCurrentEstimatePfrTmp
.
getFirstDemandsPfr
(),
deviceCurrentEstimatePfrTmp
.
getFirstPositionsPfr
(),
deviceCurrentEstimatePfrTmp
.
getFirstSolutionsPfr
(),
deviceCurrentEstimatePfrTmp
.
getSecondDemandsPfr
(),
deviceCurrentEstimatePfrTmp
.
getSecondPositionsPfr
(),
deviceCurrentEstimatePfrTmp
.
getSecondSolutionsPfr
(),
DateUtils
.
getDateStr
(
date
),
DateUtils
.
getTimeStr
(
date
)
)
);
statement
.
executeUpdate
(
String
.
format
(
"delete from device_recently_estimate_view_pfr where "
+
"device_id = '%s' and "
+
"statistics_type = '%s' and "
+
"statistics_type_id = '%s' and "
+
"last_update_time <= '%s'"
,
deviceCurrentEstimatePfrTmp
.
getDeviceId
(),
deviceCurrentEstimatePfrTmp
.
getStatisticsType
(),
deviceCurrentEstimatePfrTmp
.
getStatisticsTypeId
(),
DateUtils
.
getSevenDaysAgoTimeStr
(
date
)
)
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
finally
{
try
{
if
(
statement
!=
null
)
{
statement
.
close
();
}
if
(
connection
!=
null
)
{
connection
.
close
();
}
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
}
}
}
src/main/java/com/gmei/data/ctr/source/TidbMysqlAsyncPfrSource.java
0 → 100644
View file @
5364b4d3
package
com
.
gmei
.
data
.
ctr
.
source
;
import
com.alibaba.druid.pool.DruidDataSource
;
import
com.gmei.data.ctr.bean.CtrEstimatePfrEtl
;
import
com.gmei.data.ctr.bean.DeviceCurrentEstimatePfrTmp
;
import
com.gmei.data.ctr.common.Constants
;
import
com.gmei.data.ctr.utils.DateUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.async.ResultFuture
;
import
org.apache.flink.streaming.api.functions.async.RichAsyncFunction
;
import
java.sql.Connection
;
import
java.sql.PreparedStatement
;
import
java.sql.ResultSet
;
import
java.util.Collections
;
import
java.util.Date
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Future
;
import
java.util.function.Supplier
;
import
static
java
.
util
.
concurrent
.
Executors
.
newFixedThreadPool
;
/**
* @ClassName MysqlAsyncSource
* @Author apple
* @Date 2020/3/29
* @Version V1.0
**/
public
class
TidbMysqlAsyncPfrSource
extends
RichAsyncFunction
<
CtrEstimatePfrEtl
,
DeviceCurrentEstimatePfrTmp
>
{
private
String
jerryJdbcUrl
;
private
String
jerryUsername
;
private
String
jerryPassword
;
private
transient
DruidDataSource
dataSource
;
private
transient
ExecutorService
executorService
;
public
TidbMysqlAsyncPfrSource
(
String
jerryJdbcUrl
,
String
jerryUsername
,
String
jerryPassword
)
{
this
.
jerryJdbcUrl
=
jerryJdbcUrl
;
this
.
jerryUsername
=
jerryUsername
;
this
.
jerryPassword
=
jerryPassword
;
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
executorService
=
newFixedThreadPool
(
20
);
dataSource
=
new
DruidDataSource
();
dataSource
.
setDriverClassName
(
Constants
.
MYSQL_DRIVER_CLASS
);
dataSource
.
setUrl
(
jerryJdbcUrl
);
dataSource
.
setUsername
(
jerryUsername
);
dataSource
.
setPassword
(
jerryPassword
);
dataSource
.
setInitialSize
(
5
);
dataSource
.
setMinIdle
(
10
);
dataSource
.
setMaxActive
(
20
);
}
@Override
public
void
asyncInvoke
(
CtrEstimatePfrEtl
ctrEstimatePfrEtl
,
ResultFuture
<
DeviceCurrentEstimatePfrTmp
>
resultFuture
)
throws
Exception
{
Future
<
DeviceCurrentEstimatePfrTmp
>
future
=
executorService
.
submit
(()
->
{
return
queryFromMySql
(
ctrEstimatePfrEtl
);
});
CompletableFuture
.
supplyAsync
(
new
Supplier
<
DeviceCurrentEstimatePfrTmp
>()
{
@Override
public
DeviceCurrentEstimatePfrTmp
get
()
{
try
{
return
future
.
get
();
}
catch
(
Exception
e
)
{
return
null
;
}
}
}).
thenAccept
((
DeviceCurrentEstimatePfrTmp
dbResult
)
->{
resultFuture
.
complete
(
Collections
.
singleton
(
dbResult
));
});
}
@Override
public
void
close
()
{
dataSource
.
close
();
executorService
.
shutdown
();
}
private
DeviceCurrentEstimatePfrTmp
queryFromMySql
(
CtrEstimatePfrEtl
ctrEstimatePfrEtl
)
{
DeviceCurrentEstimatePfrTmp
dcept
=
null
;
String
statisticsType
=
ctrEstimatePfrEtl
.
getStatisticsType
();
String
deviceId
=
ctrEstimatePfrEtl
.
getDeviceId
();
String
statisticsTypeId
=
ctrEstimatePfrEtl
.
getStatisticsTypeId
();
if
(
statisticsType
!=
null
&&
deviceId
!=
null
&&
statisticsTypeId
!=
null
){
String
sql
=
""
;
if
(
"service"
.
equals
(
statisticsType
)){
sql
=
String
.
format
(
"select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags "
+
"from strategy_service_tagv3_info where service_id = '%d'"
,
statisticsTypeId
);
}
if
(
"diary"
.
equals
(
statisticsType
)){
sql
=
String
.
format
(
"select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags "
+
"from strategy_content_tagv3_info where content_id = '%d'"
,
statisticsTypeId
);
}
else
if
(
"tractate"
.
equals
(
statisticsType
)){
sql
=
String
.
format
(
"select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags "
+
"from strategy_tractate_tagv3_info where content_id = '%d'"
,
statisticsTypeId
);
}
else
if
(
"answer"
.
equals
(
statisticsType
)){
sql
=
String
.
format
(
"select first_demands,first_positions,first_solutions,second_demands,second_positions,second_solutions,project_tags "
+
"from strategy_answer_tagv3_info where content_id = '%d'"
,
statisticsTypeId
);
}
if
(
StringUtils
.
isNotBlank
(
sql
)){
dcept
=
findTagInfo
(
sql
,
statisticsType
,
statisticsTypeId
);
if
(
null
!=
dcept
){
Date
date
=
new
Date
();
dcept
.
setStatisticsType
(
statisticsType
);
dcept
.
setDeviceId
(
deviceId
);
dcept
.
setPartitionDate
(
DateUtils
.
getDateStr
(
date
));
dcept
.
setLastUpdateTime
(
DateUtils
.
getTimeStr
(
date
));
}
}
}
return
dcept
;
}
private
DeviceCurrentEstimatePfrTmp
findTagInfo
(
String
sql
,
String
statisticsType
,
String
statisticsTypeId
){
DeviceCurrentEstimatePfrTmp
deviceCurrentEstimatePfrTmp
=
null
;
Connection
connection
=
null
;
PreparedStatement
stmt
=
null
;
ResultSet
rs
=
null
;
try
{
connection
=
dataSource
.
getConnection
();
stmt
=
connection
.
prepareStatement
(
sql
);
rs
=
stmt
.
executeQuery
();
while
(
rs
.
next
()){
deviceCurrentEstimatePfrTmp
=
new
DeviceCurrentEstimatePfrTmp
();
deviceCurrentEstimatePfrTmp
.
setStatisticsType
(
statisticsType
);
deviceCurrentEstimatePfrTmp
.
setStatisticsTypeId
(
statisticsTypeId
);
deviceCurrentEstimatePfrTmp
.
setProjectPfr
(
rs
.
getString
(
"project_tags"
));
deviceCurrentEstimatePfrTmp
.
setFirstDemandsPfr
(
rs
.
getString
(
"first_demands"
));
deviceCurrentEstimatePfrTmp
.
setFirstPositionsPfr
(
rs
.
getString
(
"first_positions"
));
deviceCurrentEstimatePfrTmp
.
setFirstSolutionsPfr
(
rs
.
getString
(
"first_solutions"
));
deviceCurrentEstimatePfrTmp
.
setSecondDemandsPfr
(
rs
.
getString
(
"second_demands"
));
deviceCurrentEstimatePfrTmp
.
setSecondPositionsPfr
(
rs
.
getString
(
"second_positions"
));
deviceCurrentEstimatePfrTmp
.
setSecondSolutionsPfr
(
rs
.
getString
(
"second_solutions"
));
}
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
finally
{
try
{
if
(
rs
!=
null
)
{
rs
.
close
();
}
if
(
stmt
!=
null
)
{
stmt
.
close
();
}
if
(
connection
!=
null
)
{
connection
.
close
();
}
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
}
return
deviceCurrentEstimatePfrTmp
;
}
}
\ No newline at end of file
src/main/java/com/gmei/data/ctr/source/TidbMysqlAsyncSource.java
→
src/main/java/com/gmei/data/ctr/source/TidbMysqlAsync
Tag
Source.java
View file @
5364b4d3
...
...
@@ -28,14 +28,14 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Date 2020/3/29
* @Version V1.0
**/
public
class
TidbMysqlAsyncSource
extends
RichAsyncFunction
<
CtrEstimateTagEtl
,
DeviceCurrentEstimateTagTmp
>
{
public
class
TidbMysqlAsync
Tag
Source
extends
RichAsyncFunction
<
CtrEstimateTagEtl
,
DeviceCurrentEstimateTagTmp
>
{
private
String
jerryJdbcUrl
;
private
String
jerryUsername
;
private
String
jerryPassword
;
private
transient
DruidDataSource
dataSource
;
private
transient
ExecutorService
executorService
;
public
TidbMysqlAsyncSource
(
String
jerryJdbcUrl
,
String
jerryUsername
,
String
jerryPassword
)
{
public
TidbMysqlAsync
Tag
Source
(
String
jerryJdbcUrl
,
String
jerryUsername
,
String
jerryPassword
)
{
this
.
jerryJdbcUrl
=
jerryJdbcUrl
;
this
.
jerryUsername
=
jerryUsername
;
this
.
jerryPassword
=
jerryPassword
;
...
...
src/main/java/com/gmei/data/ctr/source/Z
hengxingMysqlAsync
Source.java
→
src/main/java/com/gmei/data/ctr/source/Z
xMysqlAsyncTag
Source.java
View file @
5364b4d3
...
...
@@ -28,14 +28,14 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
* @Date 2020/3/29
* @Version V1.0
**/
public
class
Z
hengxingMysqlAsync
Source
extends
RichAsyncFunction
<
CtrEstimateTagEtl
,
DeviceCurrentEstimateTagTmp
>
{
public
class
Z
xMysqlAsyncTag
Source
extends
RichAsyncFunction
<
CtrEstimateTagEtl
,
DeviceCurrentEstimateTagTmp
>
{
private
String
zxJdbcUrl
;
private
String
zxUsername
;
private
String
zxPassword
;
private
transient
DruidDataSource
dataSource
;
private
transient
ExecutorService
executorService
;
public
Z
hengxingMysqlAsync
Source
(
String
zxJdbcUrl
,
String
zxUsername
,
String
zxPassword
)
{
public
Z
xMysqlAsyncTag
Source
(
String
zxJdbcUrl
,
String
zxUsername
,
String
zxPassword
)
{
this
.
zxJdbcUrl
=
zxJdbcUrl
;
this
.
zxUsername
=
zxUsername
;
this
.
zxPassword
=
zxPassword
;
...
...
src/main/java/com/gmei/data/ctr/utils/DateUtils.java
View file @
5364b4d3
...
...
@@ -51,6 +51,19 @@ public class DateUtils {
return
sdf
.
format
(
calendar
.
getTime
());
}
/**
* 获取七天前的时间
* @param date
* @return
*/
public
static
String
getSevenDaysAgoTimeStr
(
Date
date
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTime
(
date
);
calendar
.
add
(
Calendar
.
DAY_OF_MONTH
,-
7
);
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
DATE_FORMATE_YMDHMS
);
return
sdf
.
format
(
calendar
.
getTime
());
}
/**
* 获取当前时间戳
* @param date
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment