Commit 54065e39 authored by 赵建伟's avatar 赵建伟

update codes

parent 2fb0f6f1
...@@ -23,8 +23,8 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \ ...@@ -23,8 +23,8 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--portraitErrGroupId 'flink_monitor_err' \ --portraitErrGroupId 'flink_monitor_err' \
--portraitShdGroupId 'flink_monitor_shd' \ --portraitShdGroupId 'flink_monitor_shd' \
--portraitSucGroupId 'flink_monitor_suc' \ --portraitSucGroupId 'flink_monitor_suc' \
--windowSize 600 \ --windowSize 60 \
--slideSize 600 \ --slideSize 60 \
--outJdbcUrl 'jdbc:mysql://172.18.44.3:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \ --outJdbcUrl 'jdbc:mysql://172.18.44.3:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false' \
--maxRetry 3 \ --maxRetry 3 \
--retryInteral 3000 \ --retryInteral 3000 \
......
...@@ -49,15 +49,13 @@ public class PortraitMonitorMain { ...@@ -49,15 +49,13 @@ public class PortraitMonitorMain {
// 获得流处理环境对象 // 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000); env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath)); env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig(); CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 获取数据源 // 获取数据源
DataStream portraitErrDataStream = new PortraitKafkaSource( DataStream portraitErrDataStream = new PortraitKafkaSource(
env, env,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment