Commit 2d9560d2 authored by 刘喆's avatar 刘喆

update cehckpoint config

parent f0cfcaa1
...@@ -97,24 +97,27 @@ public class PreciseExposureStreaming { ...@@ -97,24 +97,27 @@ public class PreciseExposureStreaming {
/* /*
checkpoint设置 checkpoint设置
*/ */
//start a checkpoint every 300000 ms(时间设置太短导致checkout还未完成就已超时,目前设置为5分钟) //start a checkpoint every 300000 ms(时间设置太短导致checkout还未完成就已超时,目前设置为5分钟)
env.enableCheckpointing(300000); env.enableCheckpointing(300000);
//设置statebackend(默认为true)
env.setStateBackend(new FsStateBackend(checkpointPath,true));
//重试次数1,重试间隔时间30s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
//set mode to exactly-once (this is the default) //set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//make sure 500 ms of progress happen between checkpoints //make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoints have to complete within one minute, or are discarded //checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setCheckpointTimeout(180000);
//allow only one checkpoint to be in progress at the same time //allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//enable externalized checkpoints which are retained after job cancellation //enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure. //This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true); // env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
//设置statebackend
env.setStateBackend(new FsStateBackend(checkpointPath,true));
//重试次数1,重试间隔时间30s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
//设置时间属性(EventTime:事件时间;IngestionTime:接入时间;ProcessingTime:处理时间(默认)) //设置时间属性(EventTime:事件时间;IngestionTime:接入时间;ProcessingTime:处理时间(默认))
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment