Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
flink-monitor
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
赵建伟
flink-monitor
Commits
5b2fd98b
Commit
5b2fd98b
authored
Mar 25, 2020
by
赵建伟
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update codes
parent
80f547e9
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
458 additions
and
12 deletions
+458
-12
start_shd.sh
bin/start_shd.sh
+68
-0
start_suc.sh
bin/start_suc.sh
+68
-0
stop_shd.sh
bin/stop_shd.sh
+5
-0
stop_suc.sh
bin/stop_suc.sh
+5
-0
PortraitMonitorMain.java
src/main/java/com/gmei/data/monitor/PortraitMonitorMain.java
+12
-12
PortraitMonitorMainShd.java
...in/java/com/gmei/data/monitor/PortraitMonitorMainShd.java
+100
-0
PortraitMonitorMainShdSuc.java
...java/com/gmei/data/monitor/PortraitMonitorMainShdSuc.java
+100
-0
PortraitMonitorMainSuc.java
...in/java/com/gmei/data/monitor/PortraitMonitorMainSuc.java
+100
-0
No files found.
bin/start_shd.sh
0 → 100755
View file @
5b2fd98b
#!/bin/bash
export
FLINK_HOME
=
/opt/flink-1.9.0
export
JAR_DIR
=
/srv/apps/flink-monitor/libs
export
HADOOP_HOME
=
/opt/hadoop-2.6.0-cdh5.16.1
export
JAVA_OPTS
=
"-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
nohup
$FLINK_HOME
/bin/flink run
\
-m
yarn-cluster
\
-ynm
portrait-monitor-shd
\
-yqu
flink
\
-yn
2
\
-ys
2
\
-p
4
\
-yjm
1024
\
-ytm
2048
\
-c
'com.gmei.data.monitor.PortraitMonitorMainShd'
$JAR_DIR
/flink-monitor-1.0-SNAPSHOT.jar
\
--inBrokers
'172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092'
\
--batchSize
1000
\
--maidianInTopic
'gm-maidian-data'
\
--backendInTopic
'gm-logging-prod'
\
--portraitSucInTopic
'gm-portrait-result'
\
--portraitErrGroupId
'flink_monitor_err'
\
--portraitShdGroupId
'flink_monitor_shd'
\
--portraitSucGroupId
'flink_monitor_suc'
\
--windowSize
600
\
--slideSize
600
\
--outJdbcUrl
'jdbc:mysql://172.18.44.3:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false'
\
--maxRetry
3
\
--retryInteral
3000
\
--checkpointPath
'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint'
\
--parallelism
12
\
--startTime
'2020-03-25 17:50:00'
\
>>
/data/log/flink-monitor/flink-monitor.out 2>&1 &
tail
-10f
/data/log/flink-monitor/flink-monitor.out
#$FLINK_HOME/bin/flink run \
#-m yarn-cluster \
#-yn 3 \
#-ynm portrait_monitor \
#-yqu flink \
#-p 6 \
#-yjm 1024 \
#-ytm 2048 \
#-ys 2 \
#$JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
#--inBrokers 'test003:9092' \
#--batchSize 1000 \
#--maidianInTopic 'test11' \
#--backendInTopic 'test12' \
#--portraitSucTopic 'test13' \
#--portraitErrGroupId 'flink_monitor_err' \
#--portraitShdGroupId 'flink_monitor_shd' \
#--portraitSucGroupId 'flink_monitor_suc' \
#--windowSize 3 \
#--slideSize 3 \
#--outJdbcUrl 'jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
#--maxRetry 1 \
#--retryInteral 1000 \
#--checkpointPath 'hdfs://bj-test-gmei-hdfs/user/data/flink/bl_hdfs_maidian_open/checkpoint' \
#--isStartFromEarliest false \
#--isStartFromLatest false \
#--startTime '2020-03-25 14:15:00' \
#--parallelism 10
\ No newline at end of file
bin/start_suc.sh
0 → 100755
View file @
5b2fd98b
#!/bin/bash
export
FLINK_HOME
=
/opt/flink-1.9.0
export
JAR_DIR
=
/srv/apps/flink-monitor/libs
export
HADOOP_HOME
=
/opt/hadoop-2.6.0-cdh5.16.1
export
JAVA_OPTS
=
"-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
nohup
$FLINK_HOME
/bin/flink run
\
-m
yarn-cluster
\
-ynm
portrait-monitor-suc
\
-yqu
flink
\
-yn
2
\
-ys
2
\
-p
4
\
-yjm
1024
\
-ytm
2048
\
-c
'com.gmei.data.monitor.PortraitMonitorMainSuc'
$JAR_DIR
/flink-monitor-1.0-SNAPSHOT.jar
\
--inBrokers
'172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092'
\
--batchSize
1000
\
--maidianInTopic
'gm-maidian-data'
\
--backendInTopic
'gm-logging-prod'
\
--portraitSucInTopic
'gm-portrait-result'
\
--portraitErrGroupId
'flink_monitor_err'
\
--portraitShdGroupId
'flink_monitor_shd'
\
--portraitSucGroupId
'flink_monitor_suc'
\
--windowSize
600
\
--slideSize
600
\
--outJdbcUrl
'jdbc:mysql://172.18.44.3:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false'
\
--maxRetry
3
\
--retryInteral
3000
\
--checkpointPath
'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint'
\
--parallelism
12
\
--startTime
'2020-03-25 17:50:00'
\
>>
/data/log/flink-monitor/flink-monitor.out 2>&1 &
tail
-10f
/data/log/flink-monitor/flink-monitor.out
#$FLINK_HOME/bin/flink run \
#-m yarn-cluster \
#-yn 3 \
#-ynm portrait_monitor \
#-yqu flink \
#-p 6 \
#-yjm 1024 \
#-ytm 2048 \
#-ys 2 \
#$JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
#--inBrokers 'test003:9092' \
#--batchSize 1000 \
#--maidianInTopic 'test11' \
#--backendInTopic 'test12' \
#--portraitSucTopic 'test13' \
#--portraitErrGroupId 'flink_monitor_err' \
#--portraitShdGroupId 'flink_monitor_shd' \
#--portraitSucGroupId 'flink_monitor_suc' \
#--windowSize 3 \
#--slideSize 3 \
#--outJdbcUrl 'jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
#--maxRetry 1 \
#--retryInteral 1000 \
#--checkpointPath 'hdfs://bj-test-gmei-hdfs/user/data/flink/bl_hdfs_maidian_open/checkpoint' \
#--isStartFromEarliest false \
#--isStartFromLatest false \
#--startTime '2020-03-25 14:15:00' \
#--parallelism 10
\ No newline at end of file
bin/stop_shd.sh
0 → 100755
View file @
5b2fd98b
#!/bin/bash
app_id
=
`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application
-list
|
grep
portrait-monitor-shd |
awk
'{print $1}'
`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application
-kill
$app_id
\ No newline at end of file
bin/stop_suc.sh
0 → 100755
View file @
5b2fd98b
#!/bin/bash
app_id
=
`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application
-list
|
grep
portrait-monitor-suc |
awk
'{print $1}'
`
/opt/hadoop-2.6.0-cdh5.16.1/bin/yarn application
-kill
$app_id
\ No newline at end of file
src/main/java/com/gmei/data/monitor/PortraitMonitorMain.java
View file @
5b2fd98b
...
...
@@ -58,17 +58,17 @@ public class PortraitMonitorMain {
config
.
enableExternalizedCheckpoints
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
RETAIN_ON_CANCELLATION
);
// 获取数据源
//
DataStream portraitErrDataStream = new PortraitKafkaSource(
//
env,
//
inBrokers,
//
maidianInTopic,
//
backendInTopic,
//
portraitErrGroupId,
//
batchSize,
//
isStartFromEarliest,
//
isStartFromLatest,
//
startTime
//
).getInstance();
DataStream
portraitErrDataStream
=
new
PortraitKafkaSource
(
env
,
inBrokers
,
maidianInTopic
,
backendInTopic
,
portraitErrGroupId
,
batchSize
,
isStartFromEarliest
,
isStartFromLatest
,
startTime
).
getInstance
();
DataStream
portraitShdDataStream
=
new
PortraitKafkaSource
(
env
,
inBrokers
,
...
...
@@ -92,7 +92,7 @@ public class PortraitMonitorMain {
).
getInstance
();
// 执行处理核心逻辑
//
new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new
PortraitMonitorErrOperator
(
portraitErrDataStream
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
).
run
();
new
PortraitMonitorShdOperator
(
portraitShdDataStream
,
windowSize
,
slideSize
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
).
run
();
new
PortraitMonitorSucOperator
(
portraitSucDataStream
,
windowSize
,
slideSize
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
).
run
();
...
...
src/main/java/com/gmei/data/monitor/PortraitMonitorMainShd.java
0 → 100644
View file @
5b2fd98b
package
com
.
gmei
.
data
.
monitor
;
import
com.gmei.data.monitor.operator.PortraitMonitorShdOperator
;
import
com.gmei.data.monitor.operator.PortraitMonitorSucOperator
;
import
com.gmei.data.monitor.source.PortraitKafkaSource
;
import
com.gmei.data.monitor.source.PortraitSucKafkaSource
;
import
org.apache.flink.api.common.restartstrategy.RestartStrategies
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.runtime.state.filesystem.FsStateBackend
;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
* @ClassName PortraitMonitorMain
* @Description: 画像打点实时监控主入口
*
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public
class
PortraitMonitorMainShd
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 获取运行参数
ParameterTool
parameterTool
=
ParameterTool
.
fromArgs
(
args
);
String
inBrokers
=
parameterTool
.
get
(
"inBrokers"
,
"test003:9092"
);
String
batchSize
=
parameterTool
.
get
(
"batchSize"
,
"1000"
);
String
maidianInTopic
=
parameterTool
.
get
(
"maidianInTopic"
,
"test11"
);
String
backendInTopic
=
parameterTool
.
get
(
"backendInTopic"
,
"test12"
);
String
portraitSucInTopic
=
parameterTool
.
get
(
"portraitSucInTopic"
,
"test13"
);
String
portraitErrGroupId
=
parameterTool
.
get
(
"portraitErrGroupId"
,
"flink_monitor_err"
);
String
portraitShdGroupId
=
parameterTool
.
get
(
"portraitShdGroupId"
,
"flink_monitor_shd"
);
String
portraitSucGroupId
=
parameterTool
.
get
(
"portraitSucGroupId"
,
"flink_monitor_suc"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
60
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
60
);
String
outJdbcUrl
=
parameterTool
.
get
(
"outJdbcUrl"
,
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
// 获得流处理环境对象
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setStreamTimeCharacteristic
(
TimeCharacteristic
.
EventTime
);
env
.
enableCheckpointing
(
1000
);
env
.
setStateBackend
(
new
FsStateBackend
(
checkpointPath
));
env
.
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
1
,
3000
));
CheckpointConfig
config
=
env
.
getCheckpointConfig
();
config
.
enableExternalizedCheckpoints
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
RETAIN_ON_CANCELLATION
);
// 获取数据源
// DataStream portraitErrDataStream = new PortraitKafkaSource(
// env,
// inBrokers,
// maidianInTopic,
// backendInTopic,
// portraitErrGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
DataStream
portraitShdDataStream
=
new
PortraitKafkaSource
(
env
,
inBrokers
,
maidianInTopic
,
backendInTopic
,
portraitShdGroupId
,
batchSize
,
isStartFromEarliest
,
isStartFromLatest
,
startTime
).
getInstance
();
// DataStream portraitSucDataStream = new PortraitSucKafkaSource(
// env,
// inBrokers,
// portraitSucInTopic,
// portraitSucGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
// 执行处理核心逻辑
// new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new
PortraitMonitorShdOperator
(
portraitShdDataStream
,
windowSize
,
slideSize
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
).
run
();
// new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// 常驻执行
env
.
execute
(
"Portrait realtime monitor"
);
}
}
src/main/java/com/gmei/data/monitor/PortraitMonitorMainShdSuc.java
0 → 100644
View file @
5b2fd98b
package
com
.
gmei
.
data
.
monitor
;
import
com.gmei.data.monitor.operator.PortraitMonitorShdOperator
;
import
com.gmei.data.monitor.operator.PortraitMonitorSucOperator
;
import
com.gmei.data.monitor.source.PortraitKafkaSource
;
import
com.gmei.data.monitor.source.PortraitSucKafkaSource
;
import
org.apache.flink.api.common.restartstrategy.RestartStrategies
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.runtime.state.filesystem.FsStateBackend
;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
* @ClassName PortraitMonitorMain
* @Description: 画像打点实时监控主入口
*
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public
class
PortraitMonitorMainShdSuc
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 获取运行参数
ParameterTool
parameterTool
=
ParameterTool
.
fromArgs
(
args
);
String
inBrokers
=
parameterTool
.
get
(
"inBrokers"
,
"test003:9092"
);
String
batchSize
=
parameterTool
.
get
(
"batchSize"
,
"1000"
);
String
maidianInTopic
=
parameterTool
.
get
(
"maidianInTopic"
,
"test11"
);
String
backendInTopic
=
parameterTool
.
get
(
"backendInTopic"
,
"test12"
);
String
portraitSucInTopic
=
parameterTool
.
get
(
"portraitSucInTopic"
,
"test13"
);
String
portraitErrGroupId
=
parameterTool
.
get
(
"portraitErrGroupId"
,
"flink_monitor_err"
);
String
portraitShdGroupId
=
parameterTool
.
get
(
"portraitShdGroupId"
,
"flink_monitor_shd"
);
String
portraitSucGroupId
=
parameterTool
.
get
(
"portraitSucGroupId"
,
"flink_monitor_suc"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
60
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
60
);
String
outJdbcUrl
=
parameterTool
.
get
(
"outJdbcUrl"
,
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
// 获得流处理环境对象
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setStreamTimeCharacteristic
(
TimeCharacteristic
.
EventTime
);
env
.
enableCheckpointing
(
1000
);
env
.
setStateBackend
(
new
FsStateBackend
(
checkpointPath
));
env
.
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
1
,
3000
));
CheckpointConfig
config
=
env
.
getCheckpointConfig
();
config
.
enableExternalizedCheckpoints
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
RETAIN_ON_CANCELLATION
);
// 获取数据源
// DataStream portraitErrDataStream = new PortraitKafkaSource(
// env,
// inBrokers,
// maidianInTopic,
// backendInTopic,
// portraitErrGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
DataStream
portraitShdDataStream
=
new
PortraitKafkaSource
(
env
,
inBrokers
,
maidianInTopic
,
backendInTopic
,
portraitShdGroupId
,
batchSize
,
isStartFromEarliest
,
isStartFromLatest
,
startTime
).
getInstance
();
DataStream
portraitSucDataStream
=
new
PortraitSucKafkaSource
(
env
,
inBrokers
,
portraitSucInTopic
,
portraitSucGroupId
,
batchSize
,
isStartFromEarliest
,
isStartFromLatest
,
startTime
).
getInstance
();
// 执行处理核心逻辑
// new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new
PortraitMonitorShdOperator
(
portraitShdDataStream
,
windowSize
,
slideSize
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
).
run
();
new
PortraitMonitorSucOperator
(
portraitSucDataStream
,
windowSize
,
slideSize
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
).
run
();
// 常驻执行
env
.
execute
(
"Portrait realtime monitor"
);
}
}
src/main/java/com/gmei/data/monitor/PortraitMonitorMainSuc.java
0 → 100644
View file @
5b2fd98b
package
com
.
gmei
.
data
.
monitor
;
import
com.gmei.data.monitor.operator.PortraitMonitorShdOperator
;
import
com.gmei.data.monitor.operator.PortraitMonitorSucOperator
;
import
com.gmei.data.monitor.source.PortraitKafkaSource
;
import
com.gmei.data.monitor.source.PortraitSucKafkaSource
;
import
org.apache.flink.api.common.restartstrategy.RestartStrategies
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.runtime.state.filesystem.FsStateBackend
;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
/**
* @ClassName PortraitMonitorMain
* @Description: 画像打点实时监控主入口
*
* @Author zhaojianwei
* @Date 2020/3/18
* @Version V1.0
**/
public
class
PortraitMonitorMainSuc
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 获取运行参数
ParameterTool
parameterTool
=
ParameterTool
.
fromArgs
(
args
);
String
inBrokers
=
parameterTool
.
get
(
"inBrokers"
,
"test003:9092"
);
String
batchSize
=
parameterTool
.
get
(
"batchSize"
,
"1000"
);
String
maidianInTopic
=
parameterTool
.
get
(
"maidianInTopic"
,
"test11"
);
String
backendInTopic
=
parameterTool
.
get
(
"backendInTopic"
,
"test12"
);
String
portraitSucInTopic
=
parameterTool
.
get
(
"portraitSucInTopic"
,
"test13"
);
String
portraitErrGroupId
=
parameterTool
.
get
(
"portraitErrGroupId"
,
"flink_monitor_err"
);
String
portraitShdGroupId
=
parameterTool
.
get
(
"portraitShdGroupId"
,
"flink_monitor_shd"
);
String
portraitSucGroupId
=
parameterTool
.
get
(
"portraitSucGroupId"
,
"flink_monitor_suc"
);
Integer
windowSize
=
parameterTool
.
getInt
(
"windowSize"
,
60
);
Integer
slideSize
=
parameterTool
.
getInt
(
"slideSize"
,
60
);
String
outJdbcUrl
=
parameterTool
.
get
(
"outJdbcUrl"
,
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false"
);
Integer
maxRetry
=
parameterTool
.
getInt
(
"maxRetry"
,
3
);
Long
retryInteral
=
parameterTool
.
getLong
(
"retryInteral"
,
3000
);
String
checkpointPath
=
parameterTool
.
get
(
"checkpointPath"
,
"hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint"
);
Boolean
isStartFromEarliest
=
parameterTool
.
getBoolean
(
"isStartFromEarliest"
,
false
);
Boolean
isStartFromLatest
=
parameterTool
.
getBoolean
(
"isStartFromLatest"
,
false
);
String
startTime
=
parameterTool
.
get
(
"startTime"
);
Integer
parallelism
=
parameterTool
.
getInt
(
"parallelism"
,
2
);
// 获得流处理环境对象
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setStreamTimeCharacteristic
(
TimeCharacteristic
.
EventTime
);
env
.
enableCheckpointing
(
1000
);
env
.
setStateBackend
(
new
FsStateBackend
(
checkpointPath
));
env
.
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
1
,
3000
));
CheckpointConfig
config
=
env
.
getCheckpointConfig
();
config
.
enableExternalizedCheckpoints
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
RETAIN_ON_CANCELLATION
);
// 获取数据源
// DataStream portraitErrDataStream = new PortraitKafkaSource(
// env,
// inBrokers,
// maidianInTopic,
// backendInTopic,
// portraitErrGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
// DataStream portraitShdDataStream = new PortraitKafkaSource(
// env,
// inBrokers,
// maidianInTopic,
// backendInTopic,
// portraitShdGroupId,
// batchSize,
// isStartFromEarliest,
// isStartFromLatest,
// startTime
// ).getInstance();
DataStream
portraitSucDataStream
=
new
PortraitSucKafkaSource
(
env
,
inBrokers
,
portraitSucInTopic
,
portraitSucGroupId
,
batchSize
,
isStartFromEarliest
,
isStartFromLatest
,
startTime
).
getInstance
();
// 执行处理核心逻辑
// new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
// new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new
PortraitMonitorSucOperator
(
portraitSucDataStream
,
windowSize
,
slideSize
,
outJdbcUrl
,
maxRetry
,
retryInteral
,
parallelism
).
run
();
// 常驻执行
env
.
execute
(
"Portrait realtime monitor"
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment