Commit f2cd3caa authored by 赵建伟's avatar 赵建伟

update codes

parent 024c1533
......@@ -30,7 +30,8 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \
--parallelism 12 \
--startTime '2020-03-25 14:15:00' >> /data/log/flink-monitor/flink-monitor.out 2>&1 &
--isStartFromLatest true \
>> /data/log/flink-monitor/flink-monitor.out 2>&1 &
tail -10f /data/log/flink-monitor/flink-monitor.out
......@@ -61,5 +62,5 @@ tail -10f /data/log/flink-monitor/flink-monitor.out
#--checkpointPath 'hdfs://bj-test-gmei-hdfs/user/data/flink/bl_hdfs_maidian_open/checkpoint' \
#--isStartFromEarliest false \
#--isStartFromLatest false \
#--startTime '2020-03-18 00:00:00' \
#--startTime '2020-03-25 14:15:00' \
#--parallelism 10
\ No newline at end of file
......@@ -91,9 +91,9 @@ def send_dingding(summary_msg):
# online
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=68d7d6e9aaf81ebbf0f5228a3eadf769f1af0a7b0cb3dcb8fb8885dc5d93054f '
# test
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 '
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 '
# portrait
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=504a5f386e7bde888f655d5dccd533a822adeb888de5fc0004a2b2498925a1c4'
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=504a5f386e7bde888f655d5dccd533a822adeb888de5fc0004a2b2498925a1c4'
ding_content = json.dumps(ding_talk)
ding_header = {'Content-Type': 'application/json;charset=UTF-8'}
......
......@@ -51,11 +51,11 @@ public class PortraitMonitorMain {
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 获取数据源
DataStream portraitErrDataStream = new PortraitKafkaSource(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment