Commit a741c21b authored by 赵建伟's avatar 赵建伟

update codes

parent f87d885a
......@@ -6,7 +6,7 @@ export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.16.1
export JAVA_OPTS="-Xms1024m -Xmx8192m -XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC"
$FLINK_HOME/bin/flink run \
-m yarn-client \
-m yarn-cluster \
-ynm portrait-monitor \
-yqu flink \
-yn 2 \
......@@ -29,7 +29,7 @@ $JAR_DIR/flink-monitor-1.0-SNAPSHOT.jar \
--maxRetry 3 \
--retryInteral 3000 \
--checkpointPath 'hdfs://bj-gmei-hdfs/user/data/flink/flink-monitor/checkpoint' \
--parallelism 10
--parallelism 12
#$FLINK_HOME/bin/flink run \
......
......@@ -36,8 +36,8 @@ public class PortraitMonitorMain {
String portraitErrGroupId = parameterTool.get("portraitErrGroupId","flink_monitor_err");
String portraitShdGroupId = parameterTool.get("portraitShdGroupId","flink_monitor_shd");
String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc");
Integer windowSize = parameterTool.getInt("windowSize",60000);
Integer slideSize = parameterTool.getInt("slideSize",3);
Integer windowSize = parameterTool.getInt("windowSize",60);
Integer slideSize = parameterTool.getInt("slideSize",60);
String outJdbcUrl = parameterTool.get("outJdbcUrl",
"jdbc:mysql://test002:3306/flink_monitor?user=root&password=5OqYM^zLwotJ3oSo&autoReconnect=true&useSSL=false");
Integer maxRetry = parameterTool.getInt("maxRetry",3);
......@@ -49,12 +49,12 @@ public class PortraitMonitorMain {
// 获得流处理环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend(checkpointPath));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.enableCheckpointing(1000);
// env.setStateBackend(new FsStateBackend(checkpointPath));
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3000));
// CheckpointConfig config = env.getCheckpointConfig();
// config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 获取数据源
DataStream portraitErrDataStream = new PortraitKafkaSource(
......
......@@ -2,7 +2,6 @@ package com.gmei.data.monitor.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gmei.data.monitor.bean.GmPortraitResult;
import com.gmei.data.monitor.bean.TblMonitorPortraitShd;
import com.gmei.data.monitor.sink.PortraitShdMysqlSink;
import com.gmei.data.monitor.utils.DateUtils;
......@@ -15,7 +14,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment