Commit 27aa4705 authored by 赵建伟's avatar 赵建伟

update codes

parent c47bbeaf
package com.gmei.data.monitor; package com.gmei.data.monitor;
import com.gmei.data.monitor.operator.PortraitMonitorErrOperator;
import com.gmei.data.monitor.operator.PortraitMonitorShdOperator; import com.gmei.data.monitor.operator.PortraitMonitorShdOperator;
import com.gmei.data.monitor.operator.PortraitMonitorSucOperator; import com.gmei.data.monitor.operator.PortraitMonitorSucOperator;
import com.gmei.data.monitor.source.PortraitKafkaSource; import com.gmei.data.monitor.source.PortraitKafkaSource;
...@@ -13,8 +12,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -13,8 +12,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.text.SimpleDateFormat;
/** /**
* @ClassName PortraitMonitorMain * @ClassName PortraitMonitorMain
* @Description: 画像打点实时监控主入口 * @Description: 画像打点实时监控主入口
...@@ -33,7 +30,6 @@ public class PortraitMonitorMain { ...@@ -33,7 +30,6 @@ public class PortraitMonitorMain {
String maidianInTopic = parameterTool.get("maidianInTopic", "test11"); String maidianInTopic = parameterTool.get("maidianInTopic", "test11");
String backendInTopic = parameterTool.get("backendInTopic","test12"); String backendInTopic = parameterTool.get("backendInTopic","test12");
String portraitSucInTopic = parameterTool.get("portraitSucInTopic","test13"); String portraitSucInTopic = parameterTool.get("portraitSucInTopic","test13");
String portraitErrGroupId = parameterTool.get("portraitErrGroupId","flink_monitor_err");
String portraitShdGroupId = parameterTool.get("portraitShdGroupId","flink_monitor_shd"); String portraitShdGroupId = parameterTool.get("portraitShdGroupId","flink_monitor_shd");
String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc"); String portraitSucGroupId = parameterTool.get("portraitSucGroupId","flink_monitor_suc");
Integer windowSize = parameterTool.getInt("windowSize",60); Integer windowSize = parameterTool.getInt("windowSize",60);
...@@ -57,18 +53,6 @@ public class PortraitMonitorMain { ...@@ -57,18 +53,6 @@ public class PortraitMonitorMain {
CheckpointConfig config = env.getCheckpointConfig(); CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 获取数据源
DataStream portraitErrDataStream = new PortraitKafkaSource(
env,
inBrokers,
maidianInTopic,
backendInTopic,
portraitErrGroupId,
batchSize,
isStartFromEarliest,
isStartFromLatest,
startTime
).getInstance();
DataStream portraitShdDataStream = new PortraitKafkaSource( DataStream portraitShdDataStream = new PortraitKafkaSource(
env, env,
inBrokers, inBrokers,
...@@ -92,7 +76,6 @@ public class PortraitMonitorMain { ...@@ -92,7 +76,6 @@ public class PortraitMonitorMain {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run(); new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run(); new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
......
package com.gmei.data.monitor; package com.gmei.data.monitor;
import com.gmei.data.monitor.operator.PortraitMonitorErrOperator;
import com.gmei.data.monitor.operator.PortraitMonitorShdOperator; import com.gmei.data.monitor.operator.PortraitMonitorShdOperator;
import com.gmei.data.monitor.operator.PortraitMonitorSucOperator; import com.gmei.data.monitor.operator.PortraitMonitorSucOperator;
import com.gmei.data.monitor.source.PortraitKafkaSource; import com.gmei.data.monitor.source.PortraitKafkaSource;
...@@ -20,7 +21,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -20,7 +21,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @Date 2020/3/18 * @Date 2020/3/18
* @Version V1.0 * @Version V1.0
**/ **/
public class PortraitMonitorMainShdSuc { public class PortraitMonitorMainAll {
public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception{
// 获取运行参数 // 获取运行参数
...@@ -55,17 +56,17 @@ public class PortraitMonitorMainShdSuc { ...@@ -55,17 +56,17 @@ public class PortraitMonitorMainShdSuc {
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 获取数据源 // 获取数据源
// DataStream portraitErrDataStream = new PortraitKafkaSource( DataStream portraitErrDataStream = new PortraitKafkaSource(
// env, env,
// inBrokers, inBrokers,
// maidianInTopic, maidianInTopic,
// backendInTopic, backendInTopic,
// portraitErrGroupId, portraitErrGroupId,
// batchSize, batchSize,
// isStartFromEarliest, isStartFromEarliest,
// isStartFromLatest, isStartFromLatest,
// startTime startTime
// ).getInstance(); ).getInstance();
DataStream portraitShdDataStream = new PortraitKafkaSource( DataStream portraitShdDataStream = new PortraitKafkaSource(
env, env,
inBrokers, inBrokers,
...@@ -89,7 +90,7 @@ public class PortraitMonitorMainShdSuc { ...@@ -89,7 +90,7 @@ public class PortraitMonitorMainShdSuc {
).getInstance(); ).getInstance();
// 执行处理核心逻辑 // 执行处理核心逻辑
// new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run(); new PortraitMonitorErrOperator(portraitErrDataStream,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run(); new PortraitMonitorShdOperator(portraitShdDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run(); new PortraitMonitorSucOperator(portraitSucDataStream,windowSize,slideSize,outJdbcUrl,maxRetry,retryInteral,parallelism).run();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment